深入理解并实践 RocketMQ 的配置与操作指南
一、MQ介绍
1.1 为什么要用MQ
消息队列是一种“先进先出”的数据结构
其应用场景主要包含以下3个方面
1)应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
2)流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。
处于经济考量目的:
业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰
3)数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可
1.2 MQ的优点和缺点
优点:解耦、削峰、数据分发
缺点包含以下几点:
-
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
如何保证MQ的高可用?
-
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
-
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
如何保证消息数据处理的一致性?
1.3 各种MQ产品的比较
常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。
二、RocketMQ快速入门
RocketMQ 是阿里巴巴2016年MQ中间件,使用 Java 语言开发,在阿里内部,RocketMQ 承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
2.1 准备工作
2.1.1 下载RocketMQ
这里选择的 RocketMQ 的版本:4.6.0
下载地址:下载地址
官方文档:rocketmq.apache.org/docs/quick-…
2.2.2 环境要求
-
Linux64位系统
-
JDK1.8(64位)
2.2 安装RocketMQ
2.2.1 安装步骤
我这里是以二进制包方式来安装的:
- 解压安装包
- 进入安装目录
2.2.2 目录介绍
-
bin
:启动脚本,包括 shell 脚本和 CMD 脚本 -
conf
:实例配置文件 ,包括 broker 配置文件、logback 配置文件等 -
lib
:依赖 jar 包,包括Netty、commons-lang、FastJSON等
2.3 启动RocketMQ
-
RocketMQ
默认的虚拟机内存较大,启动Broker
或者NameServer
可能会因为内存不足而导致失败,所以需要编辑如下两个配置文件,修改 JVM 内存大小# 编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小 $ vi bin/runbroker.sh # 参考设置 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" $ vi bin/runserver.sh # 参考设置 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-
启动
NameServer
# 1.启动NameServer nohup sh bin/mqnamesrv & # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动
Broker
# 1.启动Broker nohup sh bin/mqbroker -n localhost:9876 & # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/broker.log
bin/mqbroker
的一些可选参数:
-c
:指定配置文件路径-n
:NameServer 的地址
2.4 测试RocketMQ
2.4.1 发送消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2.4.2 接收消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.5 关闭RocketMQ
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker
2.6 各角色介绍
-
Producer
:消息的发送者;举例:发件者 -
Consumer
:消息接收者;举例:收件人 -
Consumer Group
:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息) -
Broker
:暂存和传输消息;举例:快递公司 -
NameServer
:管理 Broker;举例:快递公司的管理机构 -
Topic
:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息 -
Message Queue
:相当于是 Topic 的分区;用于并行发送和接收消息
2.7 broker配置文件详解
broker 默认的配置文件位置在:conf/broker.conf
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2.8 可视化监控平台搭建
2.8.1 概述
RocketMQ
有一个对其扩展的开源项目 incubator-rocketmq-externals,这个项目中有一个子模块叫 rocketmq-console
,这个便是管理控制台项目了,先将 incubator-rocketmq-externals 拉到本地,因为我们需要自己对 rocketmq-console
进行编译打包运行。
2.8.2 下载并编译打包
- 克隆项目
git clone https://github.com/apache/rocketmq-externals
- 在
rocketmq-console
中配置namesrv
集群地址:
$ cd rocketmq-console
$ vim src/main/resources/application.properties
rocketmq.config.namesrvAddr=10.211.55.4:9876
-
配置完成进行编译并打包
mvn clean package -Dmaven.test.skip=true
-
启动 rocketmq-console:
nohup java -jar rocketmq-console-ng-2.0.0.jar > tmp.log &
启动成功后,我们就可以通过浏览器访问 http://IP地址:8080
进入控制台界面了,如下图:
三、消息发送与消费示例(Maven)
-
导入MQ客户端依赖
==注意==:
rocketmq-client
的版本,要与RocketMQ
的版本一致<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.0</version> </dependency>
-
消息发送者步骤分析:
- 创建消息生产者
producer
,并指定生产者组名 - 指定
Nameserver
地址 - 启动
producer
- 创建消息对象,指定主题
Topic
、Tag
和消息体 - 发送消息
- 关闭生产者
producer
- 创建消息生产者
-
消息消费者步骤分析:
- 创建消费者
Consumer
,制定消费者组名 - 指定
Nameserver
地址 - 订阅主题
Topic
和Tag
- 设置回调函数,处理消息
- 启动消费者
consumer
- 创建消费者
3.1 基本样例
3.1.1 消息发送
1)发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 设置消息同步发送失败时的重试次数,默认为 2
producer.setRetryTimesWhenSendFailed(2);
// 设置消息发送超时时间,默认3000ms
producer.setSendMsgTimeout(3000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2)发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 设置消息异步发送失败时的重试次数,默认为 2
producer.setRetryTimesWhenSendAsyncFailed(2);
// 设置消息发送超时时间,默认3000ms
producer.setSendMsgTimeout(3000);
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
3)单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
3.1.2 消费消息
1)集群模式(负载均衡)
消费者采用集群方式消费消息,==一条消息同一个消费者组中只有一个消费者会消费到==
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
2)广播模式
消费者采用广播的方式消费消息,==一条消息同一个消费者组中每个消费者都要消费==
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
3.2 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
3.2.1 顺序消息生产
/**
* Producer,发送顺序消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
3.2.2 顺序消费消息
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
3.3 延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
3.3.1 启动消息消费者
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// 订阅Topics
consumer.subscribe("TestTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
3.3.2 发送延时消息
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
###4.3.3 验证
您将会看到消息的消费比存储时间晚10秒
3.3.4 使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
3.4 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
3.4.1 发送批量消息
如果您每次只发送不超过 4MB
的消息,则很容易使用批处理,样例如下:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
如果消息的总长度可能大于 4MB
时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
3.5 过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
3.5.1 SQL基本语法
RocketMQ 只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:
>
,>=
,<
,<=
,BETWEEN
,=
- 字符比较,比如:
=
,<>
,IN
-
IS NULL
或者IS NOT NULL
- 逻辑符号
AND
,OR
,NOT
常量支持类型为:
- 数值,比如:
123
,3.1415
- 字符,比如:
'abc'
,必须用单引号包裹起来 -
NULL
,特殊的常量 - 布尔值,
TRUE
或FALSE
只有使用 push
模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
3.5.2 消息生产者
发送消息时,你能通过putUserProperty
来设置消息的属性
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
3.5.3 消息消费者
用MessageSelector.bySql来使用sql筛选消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
3.6 事务消息
3.6.1 流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
####1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
3.6.2 发送事务消息
1) 创建事务性
推荐阅读
-
深入理解C#中的事件与委托:一次看完的全面指南,并附带源代码
-
深入理解并实践ANSYS Workbench项目:案例分析与实操指南
-
深入理解并实践ANSYS Workbench项目:案例分析与实操指南
-
玩转现代TypeScript的高阶指南:深入理解泛型与类型操作艺术
-
深入理解并实践 RocketMQ 的配置与操作指南
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾
-
Photoshop初学者指南:理解并实践文件操作 - 打开、保存与创建新文档(第三部分 - PS界面解析)
-
三步阅读法详解:第三次阅读 - 消化与转化篇。深入研读后,我们领略了书籍精华,但关键在于将这些精彩之处内化为个人的知识。为了真正吸收并掌握,第三次阅读需聚焦于思考:书中知识如何能应用到自己生活或工作中?又该如何从不同角度实施?通过实践应用,进一步深化对原作的理解和领悟。
-
SSM三大框架基础面试题-一、Spring篇 什么是Spring框架? Spring是一种轻量级框架,提高开发人员的开发效率以及系统的可维护性。 我们一般说的Spring框架就是Spring Framework,它是很多模块的集合,使用这些模块可以很方便地协助我们进行开发。这些模块是核心容器、数据访问/集成、Web、AOP(面向切面编程)、工具、消息和测试模块。比如Core Container中的Core组件是Spring所有组件的核心,Beans组件和Context组件是实现IOC和DI的基础,AOP组件用来实现面向切面编程。 Spring的6个特征: 核心技术:依赖注入(DI),AOP,事件(Events),资源,i18n,验证,数据绑定,类型转换,SpEL。 测试:模拟对象,TestContext框架,Spring MVC测试,WebTestClient。 数据访问:事务,DAO支持,JDBC,ORM,编组XML。 Web支持:Spring MVC和Spring WebFlux Web框架。 集成:远程处理,JMS,JCA,JMX,电子邮件,任务,调度,缓存。 语言:Kotlin,Groovy,动态语言。 列举一些重要的Spring模块? Spring Core:核心,可以说Spring其他所有的功能都依赖于该类库。主要提供IOC和DI功能。 Spring Aspects:该模块为与AspectJ的集成提供支持。 Spring AOP:提供面向切面的编程实现。 Spring JDBC:Java数据库连接。 Spring JMS:Java消息服务。 Spring ORM:用于支持Hibernate等ORM工具。 Spring Web:为创建Web应用程序提供支持。 Spring Test:提供了对JUnit和TestNG测试的支持。 谈谈自己对于Spring IOC和AOP的理解 IOC(Inversion Of Controll,控制反转)是一种设计思想: 在程序中手动创建对象的控制权,交由给Spring框架来管理。IOC在其他语言中也有应用,并非Spring特有。IOC容器实际上就是一个Map(key, value),Map中存放的是各种对象。 将对象之间的相互依赖关系交给IOC容器来管理,并由IOC容器完成对象的注入。这样可以很大程度上简化应用的开发,把应用从复杂的依赖关系中解放出来。IOC容器就像是一个工厂一样,当我们需要创建一个对象的时候,只需要配置好配置文件/注解即可,完全不用考虑对象是如何被创建出来的。在实际项目中一个Service类可能由几百甚至上千个类作为它的底层,假如我们需要实例化这个Service,可能要每次都搞清楚这个Service所有底层类的构造函数,这可能会把人逼疯。如果利用IOC的话,你只需要配置好,然后在需要的地方引用就行了,大大增加了项目的可维护性且降低了开发难度。 Spring中的bean的作用域有哪些? 1.singleton:该bean实例为单例 2.prototype:每次请求都会创建一个新的bean实例(多例)。 3.request:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP request内有效。 4.session:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP session内有效。 5.global-session:全局session作用域,仅仅在基于Portlet的Web应用中才有意义,Spring5中已经没有了。Portlet是能够生成语义代码(例如HTML)片段的小型Java Web插件。它们基于Portlet容器,可以像Servlet一样处理HTTP请求。但是与Servlet不同,每个Portlet都有不同的会话。 Spring中的单例bean的线程安全问题了解吗? 概念用于理解:大部分时候我们并没有在系统中使用多线程,所以很少有人会关注这个问题。单例bean存在线程问题,主要是因为当多个线程操作同一个对象的时候,对这个对象的非静态成员变量的写操作会存在线程安全问题。 有两种常见的解决方案(用于回答的点): 1.在bean对象中尽量避免定义可变的成员变量(不太现实)。 2.在类中定义一个ThreadLocal成员变量,将需要的可变成员变量保存在ThreadLocal(线程本地化对象)中(推荐的一种方式)。 ThreadLocal解决多线程变量共享问题(参考博客):https://segmentfault.com/a/1190000009236777 Spring中Bean的生命周期: 1.Bean容器找到配置文件中Spring Bean的定义。 2.Bean容器利用Java Reflection API创建一个Bean的实例。 3.如果涉及到一些属性值,利用set方法设置一些属性值。 4.如果Bean实现了BeanNameAware接口,调用setBeanName方法,传入Bean的名字。 5.如果Bean实现了BeanClassLoaderAware接口,调用setBeanClassLoader方法,传入ClassLoader对象的实例。 6.如果Bean实现了BeanFactoryAware接口,调用setBeanClassFacotory方法,传入ClassLoader对象的实例。 7.与上面的类似,如果实现了其他*Aware接口,就调用相应的方法。 8.如果有和加载这个Bean的Spring容器相关的BeanPostProcessor对象,执postProcessBeforeInitialization方法。 9.如果Bean实现了InitializingBean接口,执行afeterPropertiesSet方法。 10.如果Bean在配置文件中的定义包含init-method属性,执行指定的方法。 11.如果有和加载这个Bean的Spring容器相关的BeanPostProcess对象,执行postProcessAfterInitialization方法。 12.当要销毁Bean的时候,如果Bean实现了DisposableBean接口,执行destroy方法。 13.当要销毁Bean的时候,如果Bean在配置文件中的定义包含destroy-method属性,执行指定的方法。 Spring框架中用到了哪些设计模式? 1.工厂设计模式:Spring使用工厂模式通过BeanFactory和ApplicationContext创建bean对象。 2.代理设计模式:Spring AOP功能的实现。 3.单例设计模式:Spring中的bean默认都是单例的。 4.模板方法模式:Spring中的jdbcTemplate、hibernateTemplate等以Template结尾的对数据库操作的类,它们就使用到了模板模式。 5.包装器设计模式:我们的项目需要连接多个数据库,而且不同的客户在每次访问中根据需要会去访问不同的数据库。这种模式让我们可以根据客户的需求能够动态切换不同的数据源。 6.观察者模式:Spring事件驱动模型就是观察者模式很经典的一个应用。 7.适配器模式:Spring AOP的增强或通知(Advice)使用到了适配器模式、Spring MVC中也是用到了适配器模式适配Controller。 还有很多。。。。。。。 @Component和@Bean的区别是什么 1.作用对象不同。@Component注解作用于类,而@Bean注解作用于方法。 2.@Component注解通常是通过类路径扫描来自动侦测以及自动装配到Spring容器中(我们可以使用@ComponentScan注解定义要扫描的路径)。@Bean注解通常是在标有该注解的方法中定义产生这个bean,告诉Spring这是某个类的实例,当我需要用它的时候还给我。 3.@Bean注解比@Component注解的自定义性更强,而且很多地方只能通过@Bean注解来注册bean。比如当引用第三方库的类需要装配到Spring容器的时候,就只能通过@Bean注解来实现。 @Configuration public class AppConfig { @Bean public TransferService transferService { return new TransferServiceImpl; } } <beans> <bean id="transferService" class="com.kk.TransferServiceImpl"/> </beans> @Bean public OneService getService(status) { case (status) { when 1: return new serviceImpl1; when 2: return new serviceImpl2; when 3: return new serviceImpl3; } } 将一个类声明为Spring的bean的注解有哪些? 声明bean的注解: @Component 组件,没有明确的角色 @Service 在业务逻辑层使用(service层) @Repository 在数据访问层使用(dao层) @Controller 在展现层使用,控制器的声明 注入bean的注解: @Autowired:由Spring提供 @Inject:由JSR-330提供 @Resource:由JSR-250提供 *扩:JSR 是 java 规范标准 Spring事务管理的方式有几种? 1.编程式事务:在代码中硬编码(不推荐使用)。 2.声明式事务:在配置文件中配置(推荐使用),分为基于XML的声明式事务和基于注解的声明式事务。 Spring事务中的隔离级别有哪几种? 在TransactionDefinition接口中定义了五个表示隔离级别的常量:ISOLATION_DEFAULT:使用后端数据库默认的隔离级别,Mysql默认采用的REPEATABLE_READ隔离级别;Oracle默认采用的READ_COMMITTED隔离级别。ISOLATION_READ_UNCOMMITTED:最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读。ISOLATION_READ_COMMITTED:允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生ISOLATION_REPEATABLE_READ:对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生。ISOLATION_SERIALIZABLE:最高的隔离级别,完全服从ACID的隔离级别。所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,也就是说,该级别可以防止脏读、不可重复读以及幻读。但是这将严重影响程序的性能。通常情况下也不会用到该级别。 Spring事务中有哪几种事务传播行为? 在TransactionDefinition接口中定义了八个表示事务传播行为的常量。 支持当前事务的情况:PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。PROPAGATION_SUPPORTS: 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。PROPAGATION_MANDATORY: 如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。(mandatory:强制性)。 不支持当前事务的情况:PROPAGATION_REQUIRES_NEW: 创建一个新的事务,如果当前存在事务,则把当前事务挂起。PROPAGATION_NOT_SUPPORTED: 以非事务方式运行,如果当前存在事务,则把当前事务挂起。PROPAGATION_NEVER: 以非事务方式运行,如果当前存在事务,则抛出异常。 其他情况:PROPAGATION_NESTED: 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于PROPAGATION_REQUIRED。 二、SpringMVC篇 什么是Spring MVC ?简单介绍下你对springMVC的理解? Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架,通过把Model,View,Controller分离,将web层进行职责解耦,把复杂的web应用分成逻辑清晰的几部分,简化开发,减少出错,方便组内开发人员之间的配合。 Spring MVC的工作原理了解嘛? image.png Springmvc的优点: (1)可以支持各种视图技术,而不仅仅局限于JSP; (2)与Spring框架集成(如IoC容器、AOP等); (3)清晰的角色分配:前端控制器(dispatcherServlet) , 请求到处理器映射(handlerMapping), 处理器适配器(HandlerAdapter), 视图解析器(ViewResolver)。 (4) 支持各种请求资源的映射策略。 Spring MVC的主要组件? (1)前端控制器 DispatcherServlet(不需要程序员开发) 作用:接收请求、响应结果,相当于转发器,有了DispatcherServlet 就减少了其它组件之间的耦合度。 (2)处理器映射器HandlerMapping(不需要程序员开发) 作用:根据请求的URL来查找Handler (3)处理器适配器HandlerAdapter 注意:在编写Handler的时候要按照HandlerAdapter要求的规则去编写,这样适配器HandlerAdapter才可以正确的去执行Handler。 (4)处理器Handler(需要程序员开发) (5)视图解析器 ViewResolver(不需要程序员开发) 作用:进行视图的解析,根据视图逻辑名解析成真正的视图(view) (6)视图View(需要程序员开发jsp) View是一个接口, 它的实现类支持不同的视图类型(jsp,freemarker,pdf等等) springMVC和struts2的区别有哪些? (1)springmvc的入口是一个servlet即前端控制器(DispatchServlet),而struts2入口是一个filter过虑器(StrutsPrepareAndExecuteFilter)。 (2)springmvc是基于方法开发(一个url对应一个方法),请求参数传递到方法的形参,可以设计为单例或多例(建议单例),struts2是基于类开发,传递参数是通过类的属性,只能设计为多例。 (3)Struts采用值栈存储请求和响应的数据,通过OGNL存取数据,springmvc通过参数解析器是将request请求内容解析,并给方法形参赋值,将数据和视图封装成ModelAndView对象,最后又将ModelAndView中的模型数据通过reques域传输到页面。Jsp视图解析器默认使用jstl。 SpringMVC怎么样设定重定向和转发的? (1)转发:在返回值前面加"forward:",譬如"forward:user.do?name=method4" (2)重定向:在返回值前面加"redirect:",譬如"redirect:http://www.baidu.com" SpringMvc怎么和AJAX相互调用的? 通过Jackson框架就可以把Java里面的对象直接转化成Js可以识别的Json对象。具体步骤如下 : (1)加入Jackson.jar (2)在配置文件中配置json的映射 (3)在接受Ajax方法里面可以直接返回Object,List等,但方法前面要加上@ResponseBody注解。 如何解决POST请求中文乱码问题,GET的又如何处理呢? (1)解决post请求乱码问题: 在web.xml中配置一个CharacterEncodingFilter过滤器,设置成utf-8; <filter> <filter-name>CharacterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>CharacterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> (2)get请求中文参数出现乱码解决方法有两个: ①修改tomcat配置文件添加编码与工程编码一致,如下: <ConnectorURIEncoding="utf-8" connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/> ②另外一种方法对参数进行重新编码: String userName = new String(request.getParamter("userName").getBytes("ISO8859-1"),"utf-8") ISO8859-1是tomcat默认编码,需要将tomcat编码后的内容按utf-8编码。 Spring MVC的异常处理 ? 统一异常处理: Spring MVC处理异常有3种方式: (1)使用Spring MVC提供的简单异常处理器SimpleMappingExceptionResolver; (2)实现Spring的异常处理接口HandlerExceptionResolver 自定义自己的异常处理器; (3)使用@ExceptionHandler注解实现异常处理; 统一异常处理的博客:https://blog.csdn.net/ctwy291314/article/details/81983103 SpringMVC的控制器是不是单例模式,如果是,有什么问题,怎么解决? 是单例模式,所以在多线程访问的时候有线程安全问题,不要用同步,会影响性能的,解决方案是在控制器里面不能写成员变量。(此题目类似于上面Spring 中 第5题 有两种解决方案) SpringMVC常用的注解有哪些? @RequestMapping:用于处理请求 url 映射的注解,可用于类或方法上。用于类上,则表示类中的所有响应请求的方法都是以该地址作为父路径。 @RequestBody:注解实现接收http请求的json数据,将json转换为java对象。 @ResponseBody:注解实现将conreoller方法返回对象转化为json对象响应给客户。 SpingMvc中的控制器的注解一般用那个,有没有别的注解可以替代? 一般用@Controller注解,也可以使用@RestController,@RestController注解相当于@ResponseBody + @Controller,表示是表现层,除此之外,一般不用别的注解代替。 如果在拦截请求中,我想拦截get方式提交的方法,怎么配置? 可以在@RequestMapping注解里面加上method=RequestMethod.GET。 怎样在方法里面得到Request,或者Session? 直接在方法的形参中声明request,SpringMVC就自动把request对象传入。 如果想在拦截的方法里面得到从前台传入的参数,怎么得到? 直接在形参里面声明这个参数就可以,但必须名字和传过来的参数一样。 如果前台有很多个参数传入,并且这些参数都是一个对象的,那么怎么样快速得到这个对象? 直接在方法中声明这个对象,SpringMVC就自动会把属性赋值到这个对象里面。 SpringMVC中函数的返回值是什么? 返回值可以有很多类型,有String, ModelAndView。ModelAndView类把视图和数据都合并的一起的。 SpringMVC用什么对象从后台向前台传递数据的? 通过ModelMap对象,可以在这个对象里面调用put方法,把对象加到里面,前台就可以拿到数据。 怎么样把ModelMap里面的数据放入Session里面? 可以在类上面加上@SessionAttributes注解,里面包含的字符串就是要放入session里面的key。 SpringMvc里面拦截器是怎么写的: 有两种写法,一种是实现HandlerInterceptor接口,另外一种是继承适配器类,接着在接口方法当中,实现处理逻辑;然后在SpringMvc的配置文件中配置拦截器即可: <!-- 配置SpringMvc的拦截器 --> <mvc:interceptors> <!-- 配置一个拦截器的Bean就可以了 默认是对所有请求都拦截 --> <bean id="myInterceptor" class="com.zwp.action.MyHandlerInterceptor"></bean> <!-- 只针对部分请求拦截 --> <mvc:interceptor> <mvc:mapping path="/modelMap.do" /> <bean class="com.zwp.action.MyHandlerInterceptorAdapter" /> </mvc:interceptor> </mvc:interceptors> 注解原理: 注解本质是一个继承了Annotation的特殊接口,其具体实现类是Java运行时生成的动态代理类。我们通过反射获取注解时,返回的是Java运行时生成的动态代理对象。通过代理对象调用自定义注解的方法,会最终调用AnnotationInvocationHandler的invoke方法。该方法会从memberValues这个Map中索引出对应的值。而memberValues的来源是Java常量池 三、Mybatis篇 什么是MyBatis? MyBatis是一个可以自定义SQL、存储过程和高级映射的持久层框架。 讲下MyBatis的缓存 MyBatis的缓存分为一级缓存和二级缓存,一级缓存放在session里面,默认就有, 二级缓存放在它的命名空间里,默认是不打开的,使用二级缓存属性类需要实现Serializable序列化接口, 可在它的映射文件中配置<cache/> Mybatis是如何进行分页的?分页插件的原理是什么? 1)Mybatis使用RowBounds对象进行分页,也可以直接编写sql实现分页,也可以使用Mybatis的分页插件。 2)分页插件的原理:实现Mybatis提供的接口,实现自定义插件,在插件的拦截方法内拦截待执行的sql,然后重写sql。 举例:select * from student,拦截sql后重写为:select t.* from (select * from student)t limit 0,10 简述Mybatis的插件运行原理,以及如何编写一个插件? 1)Mybatis仅可以编写针对ParameterHandler、ResultSetHandler、StatementHandler、 Executor这4种接口的插件,Mybatis通过动态代理, 为需要拦截的接口生成代理对象以实现接口方法拦截功能, 每当执行这4种接口对象的方法时,就会进入拦截方法, 具体就是InvocationHandler的invoke方法,当然, 只会拦截那些你指定需要拦截的方法。 2)实现Mybatis的Interceptor接口并复写intercept方法, 然后在给插件编写注解,指定要拦截哪一个接口的哪些方法即可, 记住,别忘了在配置文件中配置你编写的插件。 Mybatis动态sql是做什么的?都有哪些动态sql?能简述一下动态sql的执行原理不? 1)Mybatis动态sql可以让我们在Xml映射文件内, 以标签的形式编写动态sql,完成逻辑判断和动态拼接sql的功能。 2)Mybatis提供了9种动态sql标签:trim|where|set|foreach|if|choose|when|otherwise|bind。 3)其执行原理为,使用OGNL从sql参数对象中计算表达式的值, 根据表达式的值动态拼接sql,以此来完成动态sql的功能。 #{}和${}的区别是什么? 1)#{}是预编译处理,${}是字符串替换。 2)Mybatis在处理#{}时,会将sql中的#{}替换为?号,调用PreparedStatement的set方法来赋值(有效的防止SQL注入); 3)Mybatis在处理${}时,就是把${}替换成变量的值。 为什么说Mybatis是半自动ORM映射工具?它与全自动的区别在哪里? Hibernate属于全自动ORM映射工具, 使用Hibernate查询关联对象或者关联集合对象时, 可以根据对象关系模型直接获取,所以它是全自动的。 而Mybatis在查询关联对象或关联集合对象时, 需要手动编写sql来完成,所以,称之为半自动ORM映射工具。 Mybatis是否支持延迟加载?如果支持,它的实现原理是什么? 1)Mybatis仅支持association关联对象和collection关联集合对象的延迟加载, association指的就是一对一,collection指的就是一对多查询。 在Mybatis配置文件中, 可以配置是否启用延迟加载lazyLoadingEnabled=true|false。 2)它的原理是,使用CGLIB创建目标对象的代理对象, 当调用目标方法时,进入拦截器方法, 比如调用a.getB.getName, 拦截器invoke方法发现a.getB是null值, 那么就会单独发送事先保存好的查询关联B对象的sql, 把B查询上来,然后调用a.setB(b), 于是a的对象b属性就有值了, 接着完成a.getB.getName方法的调用。 这就是延迟加载的基本原理。 MyBatis与Hibernate有哪些不同? 1)Mybatis和hibernate不同,它不完全是一个ORM框架, 因为MyBatis需要程序员自己编写Sql语句, 不过mybatis可以通过XML或注解方式灵活配置要运行的sql语句, 并将java对象和sql语句映射生成最终执行的sql, 最后将sql执行的结果再映射生成java对象。 2)Mybatis学习门槛低,简单易学,程序员直接编写原生态sql, 可严格控制sql执行性能,灵活度高,非常适合对关系数据模型要求不高的软件开发, 例如互联网软件、企业运营类软件等,因为这类软件需求变化频繁, 一但需求变化要求成果输出迅速。但是灵活的前提是mybatis无法做到数据库无关性, 如果需要实现支持多种数据库的软件则需要自定义多套sql映射文件,工作量大。 3)Hibernate对象/关系映射能力强,数据库无关性好, 对于关系模型要求高的软件(例如需求固定的定制化软件) 如果用hibernate开发可以节省很多代码,提高效率。 但是Hibernate的缺点是学习门槛高,要精通门槛更高, 而且怎么设计O/R映射,在性能和对象模型之间如何权衡, 以及怎样用好Hibernate需要具有很强的经验和能力才行。 总之,按照用户的需求在有限的资源环境下只要能做出维护性、 扩展性良好的软件架构都是好架构,所以框架只有适合才是最好。 MyBatis的好处是什么? 1)MyBatis把sql语句从Java源程序中独立出来,放在单独的XML文件中编写, 给程序的维护带来了很大便利。 2)MyBatis封装了底层JDBC API的调用细节,并能自动将结果集转换成Java Bean对象, 大大简化了Java数据库编程的重复工作。 3)因为MyBatis需要程序员自己去编写sql语句, 程序员可以结合数据库自身的特点灵活控制sql语句, 因此能够实现比Hibernate等全自动orm框架更高的查询效率,能够完成复杂查询。 简述Mybatis的Xml映射文件和Mybatis内部数据结构之间的映射关系? Mybatis将所有Xml配置信息都封装到All-In-One重量级对象Configuration内部。 在Xml映射文件中,<parameterMap>标签会被解析为ParameterMap对象, 其每个子元素会被解析为ParameterMapping对象。 <resultMap>标签会被解析为ResultMap对象, 其每个子元素会被解析为ResultMapping对象。 每一个<select>、<insert>、<update>、<delete> 标签均会被解析为MappedStatement对象, 标签内的sql会被解析为BoundSql对象。 什么是MyBatis的接口绑定,有什么好处? 接口映射就是在MyBatis中任意定义接口,然后把接口里面的方法和SQL语句绑定, 我们直接调用接口方法就可以,这样比起原来了SqlSession提供的方法我们可以有更加灵活的选择和设置. 接口绑定有几种实现方式,分别是怎么实现的? 接口绑定有两种实现方式,一种是通过注解绑定,就是在接口的方法上面加 上@Select@Update等注解里面包含Sql语句来绑定, 另外一种就是通过xml里面写SQL来绑定,在这种情况下, 要指定xml映射文件里面的namespace必须为接口的全路径名. 什么情况下用注解绑定,什么情况下用xml绑定? 当Sql语句比较简单时候,用注解绑定;当SQL语句比较复杂时候,用xml绑定,一般用xml绑定的比较多 MyBatis实现一对一有几种方式?具体怎么操作的? 有联合查询和嵌套查询,联合查询是几个表联合查询,只查询一次, 通过在resultMap里面配置association节点配置一对一的类就可以完成; 嵌套查询是先查一个表,根据这个表里面的结果的外键id, 去再另外一个表里面查询数据,也是通过association配置, 但另外一个表的查询通过select属性配置。 Mybatis能执行一对一、一对多的关联查询吗?都有哪些实现方式,以及它们之间的区别? 能,Mybatis不仅可以执行一对一、一对多的关联查询, 还可以执行多对一,多对多的关联查询,多对一查询, 其实就是一对一查询,只需要把selectOne修改为selectList即可; 多对多查询,其实就是一对多查询,只需要把selectOne修改为selectList即可。 关联对象查询,有两种实现方式,一种是单独发送一个sql去查询关联对象, 赋给主对象,然后返回主对象。另一种是使用嵌套查询,嵌套查询的含义为使用join查询, 一部分列是A对象的属性值,另外一部分列是关联对象B的属性值, 好处是只发一个sql查询,就可以把主对象和其关联对象查出来。 MyBatis里面的动态Sql是怎么设定的?用什么语法? MyBatis里面的动态Sql一般是通过if节点来实现,通过OGNL语法来实现, 但是如果要写的完整,必须配合where,trim节点,where节点是判断包含节点有 内容就插入where,否则不插入,trim节点是用来判断如果动态语句是以and 或or 开始,那么会自动把这个and或者or取掉。 Mybatis是如何将sql执行结果封装为目标对象并返回的?都有哪些映射形式? 第一种是使用<resultMap>标签,逐一定义列名和对象属性名之间的映射关系。 第二种是使用sql列的别名功能,将列别名书写为对象属性名, 比如T_NAME AS NAME,对象属性名一般是name,小写, 但是列名不区分大小写,Mybatis会忽略列名大小写,
-
在SpringMVC中,理解并操作:重置风格与form表单的CURD实践