在ASP.NET中使用MSMQ进行消息处理
最编程
2024-01-21 18:06:21
...
MSMQ是微软消息队列的英文缩写。那么什么是消息队列?这些介绍网上一大片这里就不多说了。本文对于大虾级的人物来说这只是小玩意而已,对于初学者来说这文章还是有一定的帮助,希望路过的大虾们别笑话我班门弄斧。
一、MSMQ介绍和安装消息队列
关于MSMQ详细的介绍请大家向http://www.baidu.com/或http://www.g.cn/等专家咨询。
使用消息队列的优点:稳定、消息优先级、脱机能力以及安全性。
消息队列分为用户创建的队列(专用队列)和系统队列,用户队列分为,。我是Windows XP,看下图所示(myQueue为自己创建的消息队列,msmqtriggersnotifiations为通用队列):
对消息队列有了简单的了解后,使用MSMQ进行软件开发需要安装MSMQ,安装完后就该进入实际的开发阶段。具体的安装过程就是在控制面板里“添加/删除程序”下“添加/删除Windows组件”,完成添加就OK。安装完成后就可以通过交互界添加新的消息队列,详细如下图:
出了上面这种交互界面来创建MSMQ外,也可以通过编程来完成,.NET框架里的MessageQueue类下有一静态方法Create,用来完成消息队列的创建,其定义如下:
实现消息队列的创建简单代码(C#),创建一个名为"myQueue"的非事务性"消息队列",如下:
二、创建、删除和管理队列
在.NET环境下编写Message Queue程序的前提就是需要先安装MSMQ,本文之前已经作了详细的介绍。要开发MSMQ程序就必须学习一个很重要的类(MessageQueue),该类位于名称空间System.Messageing下。其中有几个常用的方法必须掌握:
--Create方法:创建使用指定路径的新消息队列。
--Delete方法:删除现有的消息队列。
--Existe方法:查看指定消息队列是否存在。
--GetAllMessages()方法:得到队列中的所有消息。
--GetPublicQueues方法:在“消息队列”网络中定位消息队列。
--Peek/BeginPeek方法:查看某个特定队列中的消息队列,但不从该队列中移出消息。
--Receive/BeginReceive方法:检索指定消息队列中最前面的消息并将其从该队列中移除。
--Send方法:发送消息到指定的消息队列。
--Purge方法:清空指定队列的消息。
上述列举的方法在此就不作详细介绍,大家可以通过下面的示例程序中来体会他们各自的功能。
三、发送和序列化消息
MSMQ消息队列中定义的消息由一个主体(body)和若干属性构成。消息的主体可以由文本、二进制构成,根据需要还可以被加密。在MSMQ 中消息的大小不能够超过4MB。发送消息是通过Send方法来完成的,需要一个Message参数。
1、发送消息:
步骤:连接队列-->指定消息格式-->提供要发送的数据(主体)-->调用Send()方法将消息发送出去。详细见后面的示例程序。
2、序列化消息:
消息序列化可以通过.NET Framework附带的三个预定义格式化程序来完成:
-- XMLMessageFormatter对象----MessageQueue组件的默认格式化程序设置。
-- BinaryMessageFormatter对象;
-- ActiveXMessageFormatter对象;
由于后两者格式化后的消息通常不能为人阅读,所以我们经常用到的是XMLMessageFormatter对象。该对象构造方法有三种重载:
四、读取和接收消息
1、读取消息:
也就是从指定队列中获取消息,详细请查看本文前面的关于消息操作的方法介绍。
2、接收消息有两种方式:
--> 通过Receive方法--具体功能请返回本文前面有详细介绍。
--> 通过Peek方法--具体功能请返回本文前面有详细介绍。
五、消息使用实例
通过上面一系列的介绍,了解了MessageQueue类和常用的方法后,下面我们通过一个简单的示例程序来分析消息队列的创建、发送消息以及接收消息等相关知识点:
1、通过Create方法创建使用指定路径的新消息队列
2、连接消息队列并发送消息到队列
3、连接消息队列并从消息队列中接收消息
4、连接队列并清空队列的全部消息
5、连接队列并获取队列的全部消息
上面依次的列举出来5个方法,这里我就不做测试了。上述方法全部通过测试的,我在后面提供个连接,没弄清楚的朋友可下载源程序自己去
一、MSMQ介绍和安装消息队列
关于MSMQ详细的介绍请大家向http://www.baidu.com/或http://www.g.cn/等专家咨询。
使用消息队列的优点:稳定、消息优先级、脱机能力以及安全性。
消息队列分为用户创建的队列(专用队列)和系统队列,用户队列分为,。我是Windows XP,看下图所示(myQueue为自己创建的消息队列,msmqtriggersnotifiations为通用队列):
对消息队列有了简单的了解后,使用MSMQ进行软件开发需要安装MSMQ,安装完后就该进入实际的开发阶段。具体的安装过程就是在控制面板里“添加/删除程序”下“添加/删除Windows组件”,完成添加就OK。安装完成后就可以通过交互界添加新的消息队列,详细如下图:
出了上面这种交互界面来创建MSMQ外,也可以通过编程来完成,.NET框架里的MessageQueue类下有一静态方法Create,用来完成消息队列的创建,其定义如下:
1//
2// 摘要:
3// 在指定的路径中创建非事务性“消息队列”队列。
4//
5// 参数:
6// path:
7// 要创建的队列的路径。
8//
9// 返回结果:
10// 表示新队列的 System.Messaging.MessageQueue。
11public static MessageQueue Create(string path);
12//
13// 摘要:
14// 在指定的路径中创建事务性或非事务性“消息队列”队列。
15//
16// 参数:
17// transactional:
18// 如果创建事务性队列,为 true;如果创建非事务性队列,则为 false。
19//
20// path:
21// 要创建的队列的路径。
22//
23// 返回结果:
24// 表示新队列的 System.Messaging.MessageQueue。
25public static MessageQueue Create(string path, bool transactional);
2// 摘要:
3// 在指定的路径中创建非事务性“消息队列”队列。
4//
5// 参数:
6// path:
7// 要创建的队列的路径。
8//
9// 返回结果:
10// 表示新队列的 System.Messaging.MessageQueue。
11public static MessageQueue Create(string path);
12//
13// 摘要:
14// 在指定的路径中创建事务性或非事务性“消息队列”队列。
15//
16// 参数:
17// transactional:
18// 如果创建事务性队列,为 true;如果创建非事务性队列,则为 false。
19//
20// path:
21// 要创建的队列的路径。
22//
23// 返回结果:
24// 表示新队列的 System.Messaging.MessageQueue。
25public static MessageQueue Create(string path, bool transactional);
实现消息队列的创建简单代码(C#),创建一个名为"myQueue"的非事务性"消息队列",如下:
MessageQueue.Create(@".\private$\myQueue");
二、创建、删除和管理队列
在.NET环境下编写Message Queue程序的前提就是需要先安装MSMQ,本文之前已经作了详细的介绍。要开发MSMQ程序就必须学习一个很重要的类(MessageQueue),该类位于名称空间System.Messageing下。其中有几个常用的方法必须掌握:
--Create方法:创建使用指定路径的新消息队列。
--Delete方法:删除现有的消息队列。
--Existe方法:查看指定消息队列是否存在。
--GetAllMessages()方法:得到队列中的所有消息。
--GetPublicQueues方法:在“消息队列”网络中定位消息队列。
--Peek/BeginPeek方法:查看某个特定队列中的消息队列,但不从该队列中移出消息。
--Receive/BeginReceive方法:检索指定消息队列中最前面的消息并将其从该队列中移除。
--Send方法:发送消息到指定的消息队列。
--Purge方法:清空指定队列的消息。
上述列举的方法在此就不作详细介绍,大家可以通过下面的示例程序中来体会他们各自的功能。
三、发送和序列化消息
MSMQ消息队列中定义的消息由一个主体(body)和若干属性构成。消息的主体可以由文本、二进制构成,根据需要还可以被加密。在MSMQ 中消息的大小不能够超过4MB。发送消息是通过Send方法来完成的,需要一个Message参数。
1、发送消息:
步骤:连接队列-->指定消息格式-->提供要发送的数据(主体)-->调用Send()方法将消息发送出去。详细见后面的示例程序。
2、序列化消息:
消息序列化可以通过.NET Framework附带的三个预定义格式化程序来完成:
-- XMLMessageFormatter对象----MessageQueue组件的默认格式化程序设置。
-- BinaryMessageFormatter对象;
-- ActiveXMessageFormatter对象;
由于后两者格式化后的消息通常不能为人阅读,所以我们经常用到的是XMLMessageFormatter对象。该对象构造方法有三种重载:
1public XmlMessageFormatter();
2public XmlMessageFormatter(string[] targetTypeNames);
3public XmlMessageFormatter(Type[] targetTypes);
如我们后面的示例程序中用到的序列化语句: 2public XmlMessageFormatter(string[] targetTypeNames);
3public XmlMessageFormatter(Type[] targetTypes);
1//序列化为字符串
2XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
2XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
四、读取和接收消息
1、读取消息:
也就是从指定队列中获取消息,详细请查看本文前面的关于消息操作的方法介绍。
2、接收消息有两种方式:
--> 通过Receive方法--具体功能请返回本文前面有详细介绍。
--> 通过Peek方法--具体功能请返回本文前面有详细介绍。
五、消息使用实例
通过上面一系列的介绍,了解了MessageQueue类和常用的方法后,下面我们通过一个简单的示例程序来分析消息队列的创建、发送消息以及接收消息等相关知识点:
1、通过Create方法创建使用指定路径的新消息队列
1/**//// <summary>
2/// 通过Create方法创建使用指定路径的新消息队列
3/// </summary>
4/// <param name="queuePath"></param>
5public static void Createqueue(string queuePath)
6{
7 try
8 {
9 if (!MessageQueue.Exists(queuePath))
10 {
11 MessageQueue.Create(@".\private$\myQueue");
12 }
13 else
14 {
15 Console.WriteLine(queuePath + "已经存在!");
16 }
17 }
18 catch (MessageQueueException e)
19 {
20 Console.WriteLine(e.Message);
21 }
22}
2/// 通过Create方法创建使用指定路径的新消息队列
3/// </summary>
4/// <param name="queuePath"></param>
5public static void Createqueue(string queuePath)
6{
7 try
8 {
9 if (!MessageQueue.Exists(queuePath))
10 {
11 MessageQueue.Create(@".\private$\myQueue");
12 }
13 else
14 {
15 Console.WriteLine(queuePath + "已经存在!");
16 }
17 }
18 catch (MessageQueueException e)
19 {
20 Console.WriteLine(e.Message);
21 }
22}
2、连接消息队列并发送消息到队列
1/**//// <summary>
2/// 连接消息队列并发送消息到队列
3/// </summary>
4public static void SendMessage()
5{
6 try
7 {
8 //连接到本地的队列
9 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
10
11 Message myMessage = new Message();
12 myMessage.Body = "消息内容";
13 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
14 //发送消息到队列中
15 myQueue.Send(myMessage);
16 }
17 catch (ArgumentException e)
18 {
19 Console.WriteLine(e.Message);
20 }
21}
2/// 连接消息队列并发送消息到队列
3/// </summary>
4public static void SendMessage()
5{
6 try
7 {
8 //连接到本地的队列
9 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
10
11 Message myMessage = new Message();
12 myMessage.Body = "消息内容";
13 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
14 //发送消息到队列中
15 myQueue.Send(myMessage);
16 }
17 catch (ArgumentException e)
18 {
19 Console.WriteLine(e.Message);
20 }
21}
3、连接消息队列并从消息队列中接收消息
1/**//// <summary>
2/// 连接消息队列并从队列中接收消息
3/// </summary>
4public static void ReceiveMessage()
5{
6 //连接到本地队列
7 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
8 myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
9 try
10 {
11 //从队列中接收消息
12 Message myMessage = myQueue.Receive();
13 string context = (string)myMessage.Body; //获取消息的内容
14 Console.WriteLine("消息内容为:" + context);
15 }
16 catch (MessageQueueException e)
17 {
18 Console.WriteLine(e.Message);
19 }
20 catch (InvalidCastException e)
21 {
22 Console.WriteLine(e.Message);
23 }
24}
2/// 连接消息队列并从队列中接收消息
3/// </summary>
4public static void ReceiveMessage()
5{
6 //连接到本地队列
7 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
8 myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
9 try
10 {
11 //从队列中接收消息
12 Message myMessage = myQueue.Receive();
13 string context = (string)myMessage.Body; //获取消息的内容
14 Console.WriteLine("消息内容为:" + context);
15 }
16 catch (MessageQueueException e)
17 {
18 Console.WriteLine(e.Message);
19 }
20 catch (InvalidCastException e)
21 {
22 Console.WriteLine(e.Message);
23 }
24}
4、连接队列并清空队列的全部消息
1/**//// <summary>
2/// 清空指定队列的消息
3/// </summary>
4public static void ClearMessage()
5{
6 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
7 myQueue.Purge();
8}
2/// 清空指定队列的消息
3/// </summary>
4public static void ClearMessage()
5{
6 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
7 myQueue.Purge();
8}
5、连接队列并获取队列的全部消息
1/**//// <summary>
2/// 连接队列并获取队列的全部消息
3/// </summary>
4public static void GetAllMessage()
5{
6 //连接到本地队列
7 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
8 Message[] message = myQueue.GetAllMessages();
9 XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
10 for (int i = 0; i < message.Length; i++)
11 {
12 message[i].Formatter = formatter;
13 Console.WriteLine(message[i].Body.ToString());
14 }
15}
2/// 连接队列并获取队列的全部消息
3/// </summary>
4public static void GetAllMessage()
5{
6 //连接到本地队列
7 MessageQueue myQueue = new MessageQueue(".\\private$\\myQueue");
8 Message[] message = myQueue.GetAllMessages();
9 XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
10 for (int i = 0; i < message.Length; i++)
11 {
12 message[i].Formatter = formatter;
13 Console.WriteLine(message[i].Body.ToString());
14 }
15}
上面依次的列举出来5个方法,这里我就不做测试了。上述方法全部通过测试的,我在后面提供个连接,没弄清楚的朋友可下载源程序自己去
推荐阅读
-
epoll简介及触发模式(accept、read、send)-epoll的简单介绍 epoll在LT和ET模式下的读写方式 一、epoll的接口非常简单,一共就三个函数:1. int epoll_create(int size);创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close关闭,否则可能导致fd被耗尽。2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epoll的事件注册函数,它不同与select是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create的返回值,第二个参数表示动作,用三个宏来表示:EPOLL_CTL_ADD:注册新的fd到epfd中;EPOLL_CTL_MOD:修改已经注册的fd的监听事件;EPOLL_CTL_DEL:从epfd中删除一个fd;第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */};events可以是以下几个宏的集合:EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLIN事件:EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。现在明白为什么说epoll必须要求异步socket了吧?如果同步socket,而且要求读完所有数据,那么最终就会在堵死在阻塞里。 EPOLLOUT:表示对应的文件描述符可以写; EPOLLOUT事件:EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那要先准备好下面条件:1.某次write,写满了发送缓冲区,返回错误码为EAGAIN。2.对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发,这叫法没错的!其实,如果真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。1. 缓冲区由满变空.2.同时注册EPOLLIN | EPOLLOUT事件,也会触发一次EPOLLOUT事件这个两个也会触发EPOLLOUT事件 EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);EPOLLERR:表示对应的文件描述符发生错误;EPOLLHUP:表示对应的文件描述符被挂断;EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的产生,类似于select调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。-------------------------------------------------------------------------------------------- 从man手册中,得到ET和LT的具体描述如下EPOLL事件有两种模型:Edge Triggered (ET)Level Triggered (LT)假如有这样一个例子:1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符2. 这个时候从管道的另一端被写入了2KB的数据3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作4. 然后我们读取了1KB的数据5. 调用epoll_wait(2)......Edge Triggered 工作模式:如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用 epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。 i 基于非阻塞文件句柄 ii 只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。Level Triggered 工作模式相反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在 epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。然后详细解释ET, LT:LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认(这句话不理解)。在许多测试中我们会看到如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当我们遇到大量的idle- connection(例如WAN环境中存在大量的慢速连接),就会发现epoll的效率大大高于select/poll。(未测试)另外,当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,读数据的时候需要考虑的是当recv返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取: 这里只是说明思路(参考《UNIX网络编程》) while(rs) {buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);if(buflen < 0){// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读// 在这里就当作是该次事件已处理处.if(errno == EAGAIN)break; else return; }else if(buflen == 0) { // 这里表示对端的socket已正常关闭. } if(buflen == sizeof(buf) rs = 1; // 需要再次读取 else rs = 0; } 还有,假如发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考man send),同时,不理会这次请求发送的数据.所以,需要封装socket_send的函数用来处理这种情况,该函数会尽量将数据写完再返回,返回-1表示出错。在socket_send内部,当写缓冲已满(send返回-1,且errno为EAGAIN),那么会等待后再重试.这种方式并不很完美,在理论上可能会长时间的阻塞在socket_send内部,但暂没有更好的办法. ssize_t socket_send(int sockfd, const char* buffer, size_t buflen) { ssize_t tmp; size_t total = buflen; const char *p = buffer; while(1) { tmp = send(sockfd, p, total, 0); if(tmp < 0) { // 当send收到信号时,可以继续写,但这里返回-1. if(errno == EINTR) return -1; // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满, // 在这里做延时后再重试. if(errno == EAGAIN) { usleep(1000); continue; } return -1; } if((size_t)tmp == total) return buflen; total -= tmp; p += tmp; } return tmp; } 二、epoll在LT和ET模式下的读写方式 在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK) 从字面上看, 意思是: * EAGAIN: 再试一次 * EWOULDBLOCK: 如果这是一个阻塞socket, 操作将被block * perror输出: Resource temporarily unavailable 总结: 这个错误表示资源暂时不够, 可能read时, 读缓冲区没有数据, 或者, write时,写缓冲区满了 。 遇到这种情况, 如果是阻塞socket, read/write就要阻塞掉。 而如果是非阻塞socket, read/write立即返回-1, 同 时errno设置为EAGAIN. 所以, 对于阻塞socket, read/write返回-1代表网络出错了. 但对于非阻塞socket, read/write返回-1不一定网络真的出错了. 可能是Resource temporarily unavailable. 这时你应该再试, 直到Resource available. 综上, 对于non-blocking的socket, 正确的读写操作为: 读: 忽略掉errno = EAGAIN的错误, 下次继续读 写: 忽略掉errno = EAGAIN的错误, 下次继续写 对于select和epoll的LT模式, 这种读写方式是没有问题的. 但对于epoll的ET模式, 这种方式还有漏洞. epoll的两种模式 LT 和 ET
-
基于 NFC 的无线电池管理 BMS - ● 主动读取内部传感器:利用 NFC 技术,BMS 能够主动读取内部传感器的数据 [... 考虑车辆外使用案例中的空闲状态场景:NFC 技术可用于处理闲置状态下的电池组读取,例如在第二次生命转移期间进行存储。 主动诊断读取:在邻近系统中部署了 BMS 的情况下,使用 NFC 技术进行主动诊断读取。 (ii) 系统结构 系统架构如图所示,在建立安全通道之前,需要对设备进行身份验证。数据链路通信层由 NDEF 记录处理,而数据存储可以是离线的,也可以是数据库中的在线存储。活动和空闲状态的诊断读数取决于设备和数据方向,需要与外部 NFC 阅读器进行通信。软件架构分为三层,包括硬件抽象层(HAL)、中间层(中间件)和应用层。HAL 处理硬件驱动组件,中间件执行设备验证,而应用层则由开发人员根据安全漏洞和格式扩展*定义。 为确保安全,系统采用了一个安全模型,为 BMS 和主动诊断读取情况格式化应用数据。安全考虑因素包括设备相互验证、使用安全通道(加密和防篡改)以及确保电池组内读数的安全。 考虑到不同的 BMS 拓扑,包括集中式、调制式、分布式和分散式,系统需要满足设备相互验证和使用安全通道的要求。对于每种拓扑结构,都必须考虑将性能开销降至最低。电池是封闭的,对其进行物理攻击不可行或成本太高。外部攻击可能也很困难。基于对称或非对称加密技术的自动验证可用于保护电池组读数。安全协议在验证阶段和会话密钥确认阶段采用双密钥加密,以抵御攻击。中间件在数据格式验证、确认和处理中发挥关键作用,确保数据传输安全。 (iii) 唤醒模型设计
-
紧急模式问题处理 - 图 1 紧急模式 根本原因分析 应急模式提供了尽可能小的环境,即使无法进入应急模式,也可以在其中修复系统。在应急模式下,系统只安装根文件系统供读取,不尝试安装任何其他本地文件系统,不激活网络接口,只启动一些基本服务。 进入应急模式的原因通常是 /etc/fstab 文件中存在错误,导致文件系统挂载失败。 文件系统中存在错误,导致。 约束和限制 本节适用于 Linux 操作系统紧急模式。程序涉及修复文件系统。修复文件系统有丢失数据的风险,因此请先备份数据,然后再执行修复操作。 处理方法 输入根密码,然后进入修复模式。 在应急模式下,根分区以只读模式挂载。要修改根目录中的文件,需要执行以下命令以读写模式重新挂载根分区。# mount -o rw,remount / 请执行以下命令首先检查 fstab 文件是否有误,然后尝试挂载所有未挂载的文件系统。# mount -a 如果挂载点不存在,请创建一个挂载点。 如果不存在此类设备,请注释或删除挂载行。 如果指定了不正确的挂载选项,请将挂载参数更改为正确的参数。 如果没有发生错误,但出现 UNEXPECTED INCONSISTENCY;RUN fsck MANUALLY 消息(通常是由文件系统错误引起的),请跳至第 7 步。 执行以下命令打开 /etc/fstab 以修改相应的错误。# vi /etc/fstab /etc/fstab 文件包含以下字段,以空格分隔:[文件系统] [dir] [type] [options] [dump] [fsck] 表 1 /etc/fstab 参数 说明 参数 说明 [文件系统] 要挂载的分区或存储设备。 文件系统]列建议以 UUID 的形式写入。执行 blkid 命令可查询设备文件系统 UUID。 参考格式如下: # <device> <dir> <type> <options> <dump> <fsck>; UUID=b411dc99-f0a0-4c87-9e05-184977be8539 /home ext4 defaults 0 2 使用 UUID 的好处是,它们与磁盘顺序无关。如果你在 BIOS 中更改了存储设备的顺序,或重新插入了存储设备,或者因为某些 BIOS 可能会随机更改存储设备的顺序,那么使用 UUID 会更有效率。 [文件系统] 文件系统]的挂载位置。 类型 挂载设备或分区的文件系统类型,支持多种不同的文件系统:ext2、ext3、ext4、reiserfs、xfs、jfs、smbfs、iso9660、vfat、ntfs、swap 和 auto。 设置为自动类型后,挂载命令会猜测所使用的文件系统类型,这对 CDROM 和 DVD 等移动设备非常有用。 选项 挂载时要使用的参数,有些参数是特定文件系统特有的。例如,默认值参数使用文件系统的默认挂载参数,ext4 的默认参数为:rw、suid、dev、exec、auto、nouser、async。 有关更多参数,请执行以下命令查看 man 手册:# man mount
-
趣谈留言队列,搞清楚留言队列到底是什么!-说到消息队列,洪觉大概能猜到人们听到消息队列的反应,大致可以分为以下几类人。 第一类人,懵懵懂懂,刚上大学接触编程,还没用过消息队列,甚至还以为消息队列就是代码里面要新建一个List之类的;第二类人,听过消息队列,了解消息队列,但具体是什么还不是太明白,只知道一说到消息队列,脑海里马上出现了三组词,削峰、异步、解耦;第三类人,用过消息队列,对它有一定了解,但不知道为什么要这样设计,消息队列有什么样的前世今生,是如何演化到现在的模式的?**第四类人,已经对消息队列有了足够的了解,可以阅读本帖作为复习和温习。**你属于哪一类?无论你对消息队列了解多少,读完这篇文章后,我相信你都会有所收获。 什么是消息队列?我们为什么要使用消息队列?真的只是因为它看起来很勉强、很常用吗?当然不是,一项技术的出现往往是为了解决某种痛点,我们就从这个痛点出发,看看消息队列到底是为了解决什么问题而诞生的。 相信大家在工作之前,或者工作中接触单片机的次数会多一点,不管什么业务都一股脑塞进一个系统里,这种情况下接触消息队列的场景会比较少。但随着业务的增长,量上去了,单机系统就很难维护了,也扛不住并发量的增长,就需要把原来的单体应用拆分成多个服务。例如,牛奇网采用分布式架构,将原来的单体系统拆分成用户服务、题库服务、求职服务、论坛服务等,每个分布式节点都有一个集群,保证高可用性。 那虽然在这样的微服务架构下,如果某个核心业务并发量过大,系统就扛不住了。比如淘宝、淘票票、拼多多、京东等电商场景中的支付场景,你在某宝下单并支付后,调用支付服务,完成支付后,还需要更新订单的状态,这个时候就需要调用订单服务,那我们平时也下单,除了简单完成这些操作外,还会给你相应的积分;商家也会收到订单消息,并给您发送旺旺消息,确认订单无误;同时,也会给您发送消息,确认订单无误。确认订单无误;同时您还可以查看您的物流状态;还有系统为了给您推荐更适合您的商品,会根据您的订单做类似的推荐等等,我说的这些都是当我们下单后,肉眼可以感知到系统所做的动作。 **一个支付动作如果还需要调用那么多服务,等他们响应成功,最后再告诉用户你支付成功了,用户在系统中的整个体验会非常糟糕。**设想一下,假设请求服务+处理请求+响应总共需要 50ms,我们上面列出的场景:支付服务、订单服务、积分服务、商家服务、物流服务、推荐服务,总共需要 300ms。
-
.NET高级面试指南 Topic XVIII [ 介绍外观模式(Appearance Pattern),该模式提供了一个隐藏系统复杂性的简化界面 ]。- 简化复杂系统:当系统具有复杂的子系统结构时,可以使用外观模式来简化界面。提供统一界面:当客户端需要访问多个子系统时,可以使用外观模式提供统一界面。 外观模式在现代软件开发中得到广泛应用,尤其是在复杂系统中。例如 图形用户界面库:许多图形用户界面库(如 Qt、GTK+ 等)都使用外观模式来隐藏底层的复杂性,并为开发人员提供简单的界面来创建用户界面。 操作系统接口:操作系统中的系统调用和应用程序接口通常也使用外观模式来隐藏底层硬件和系统的复杂性,为应用程序提供访问系统资源的简单接口。企业应用程序:在可能涉及多个子系统的大型企业应用程序中,外观模式可用于封装这些子系统,并为客户端提供统一的使用界面。 网络框架:许多网络框架(如 ASP.NET MVC、Spring MVC 等)也使用外观模式来隐藏底层的复杂性,并为开发人员提供简单的接口来处理 HTTP 请求和响应。 集成开发环境(IDE):集成开发环境通常包含代码编辑器、编译器、调试器等多种功能。外观模式可用于封装这些功能,并为开发人员提供开发软件的简单界面。 代码示例:
-
Android 开发中 nodpi、xhdpi、hdpi、mdpi、ldpi 的概念 - 术语和概念 屏幕尺寸 屏幕的物理尺寸,基于屏幕的对角线长度(如 2.8 英寸、3.5 英寸)。 简而言之,安卓系统将所有屏幕尺寸简化为三大类:大、普通和小。 程序可以为这三种屏幕尺寸提供三种不同的布局选项,然后系统会以合适的方式将布局选项呈现到相应的屏幕上,这个过程不需要程序员用代码进行干预。 屏幕纵横比 屏幕的物理长度与物理宽度之比。程序只需使用系统提供的资源分类器 long(长)和 notlong(不长),就能为具有特定长宽比的屏幕提供配制材料。 分辨率 屏幕的像素总数。请注意,分辨率并不意味着长宽比,尽管在大多数情况下,分辨率表示为 "宽度 x 长度"。在安卓系统中,程序一般不直接处理分辨率。 密度 根据屏幕分辨率,沿屏幕宽度和长度排列的像素数量。 密度较低的屏幕在长度和宽度方向上的像素都相对较少,而密度较高的屏幕通常会在同一区域内排列很多甚至非常非常多的像素。屏幕的密度非常重要;例如,一个界面元素(如按钮)的长度和宽度以像素为单位,在低密度屏幕上会显得很大,但在高密度屏幕上就会显得很小。 独立于密度的像素(DIP)是指程序用来定义界面元素的抽象意义上的像素。它作为一个与实际密度无关的单位,帮助程序员构建布局方案(界面元素的宽度、高度和位置)。 与密度无关的像素在逻辑上与像素密度为 160 DPI 的屏幕上的像素大小相同,而 160 DPI 是安卓平台默认的显示设备。在运行时,平台会以目标屏幕的密度为基准,"透明 "地处理所有所需的 DIP 缩放操作。要将与密度无关的像素转换为屏幕像素,可以使用一个简单的公式:像素 = DIP * (密度 / 160)。例如,在 240 DPI 的屏幕上,1 个 DIP 等于 1.5 个物理像素。强烈建议使用 DIP 来定义程序界面的布局,因为这样可以确保用户界面在所有分辨率的屏幕上都能正常显示。 为了简化程序员在面对各种分辨率时的麻烦,也为了让各种分辨率的平台都能直接运行这些程序,Android 平台将所有屏幕以密度和分辨率作为分类方式,分别分为三类:- 三大尺寸:大、普通、小;- 三种不同密度:高(hdpi)、中(mdpi)和低(ldpi)。DPI 表示 "每英寸点数",即每英寸的像素数。如果需要,程序可以为不同的屏幕尺寸提供不同的资源(主要是布局),为不同的屏幕密度提供不同的资源(主要是位图)。除此之外,程序无需对屏幕尺寸或密度进行任何额外处理。执行时,平台会根据屏幕本身的尺寸和密度特性自动加载相应的资源,并将其从逻辑像素(DIP,用于定义界面布局)转换为屏幕上的物理像素。
-
windows下进程间通信的(13种方法)-摘 要 本文讨论了进程间通信与应用程序间通信的含义及相应的实现技术,并对这些技术的原理、特性等进行了深入的分析和比较。 ---- 关键词 信号 管道 消息队列 共享存储段 信号灯 远程过程调用 Socket套接字 MQSeries 1 引言 ---- 进程间通信的主要目的是实现同一计算机系统内部的相互协作的进程之间的数据共享与信息交换,由于这些进程处于同一软件和硬件环境下,利用操作系统提供的的编程接口,用户可以方便地在程序中实现这种通信;应用程序间通信的主要目的是实现不同计算机系统中的相互协作的应用程序之间的数据共享与信息交换,由于应用程序分别运行在不同计算机系统中,它们之间要通过网络之间的协议才能实现数据共享与信息交换。进程间通信和应用程序间通信及相应的实现技术有许多相同之处,也各有自己的特色。即使是同一类型的通信也有多种的实现方法,以适应不同情况的需要。 ---- 为了充分认识和掌握这两种通信及相应的实现技术,本文将就以下几个方面对这两种通信进行深入的讨论:问题的由来、解决问题的策略和方法、每种方法的工作原理和实现、每种实现方法的特点和适用的范围等。 2 进程间的通信及其实现技术 ---- 用户提交给计算机的任务最终都是通过一个个的进程来完成的。在一组并发进程中的任何两个进程之间,如果都不存在公共变量,则称该组进程为不相交的。在不相交的进程组中,每个进程都独立于其它进程,它的运行环境与顺序程序一样,而且它的运行环境也不为别的进程所改变。运行的结果是确定的,不会发生与时间相关的错误。 ---- 但是,在实际中,并发进程的各个进程之间并不是完全互相独立的,它们之间往往存在着相互制约的关系。进程之间的相互制约关系表现为两种方式: ---- (1) 间接相互制约:共享CPU ---- (2) 直接相互制约:竞争和协作 ---- 竞争——进程对共享资源的竞争。为保证进程互斥地访问共享资源,各进程必须互斥地进入各自的临界段。 ---- 协作——进程之间交换数据。为完成一个共同任务而同时运行的一组进程称为同组进程,它们之间必须交换数据,以达到协作完成任务的目的,交换数据可以通知对方可以做某事或者委托对方做某事。 ---- 共享CPU问题由操作系统的进程调度来实现,进程间的竞争和协作由进程间的通信来完成。进程间的通信一般由操作系统提供编程接口,由程序员在程序中实现。UNIX在这个方面可以说最具特色,它提供了一整套进程间的数据共享与信息交换的处理方法——进程通信机制(IPC)。因此,我们就以UNIX为例来分析进程间通信的各种实现技术。 ---- 在UNIX中,文件(File)、信号(Signal)、无名管道(Unnamed Pipes)、有名管道(FIFOs)是传统IPC功能;新的IPC功能包括消息队列(Message queues)、共享存储段(Shared memory segment)和信号灯(Semapores)。 ---- (1) 信号 ---- 信号机制是UNIX为进程中断处理而设置的。它只是一组预定义的值,因此不能用于信息交换,仅用于进程中断控制。例如在发生浮点错、非法内存访问、执行无效指令、某些按键(如ctrl-c、del等)等都会产生一个信号,操作系统就会调用有关的系统调用或用户定义的处理过程来处理。 ---- 信号处理的系统调用是signal,调用形式是: ---- signal(signalno,action) ---- 其中,signalno是规定信号编号的值,action指明当特定的信号发生时所执行的动作。 ---- (2) 无名管道和有名管道 ---- 无名管道实际上是内存中的一个临时存储区,它由系统安全控制,并且独立于创建它的进程的内存区。管道对数据采用先进先出方式管理,并严格按顺序操作,例如不能对管道进行搜索,管道中的信息只能读一次。 ---- 无名管道只能用于两个相互协作的进程之间的通信,并且访问无名管道的进程必须有共同的祖先。 ---- 系统提供了许多标准管道库函数,如: pipe——打开一个可以读写的管道; close——关闭相应的管道; read——从管道中读取字符; write——向管道中写入字符; ---- 有名管道的操作和无名管道类似,不同的地方在于使用有名管道的进程不需要具有共同的祖先,其它进程,只要知道该管道的名字,就可以访问它。管道非常适合进程之间快速交换信息。 ---- (3) 消息队列(MQ) ---- 消息队列是内存中独立于生成它的进程的一段存储区,一旦创建消息队列,任何进程,只要具有正确的的访问权限,都可以访问消息队列,消息队列非常适合于在进程间交换短信息。 ---- 消息队列的每条消息由类型编号来分类,这样接收进程可以选择读取特定的消息类型——这一点与管道不同。消息队列在创建后将一直存在,直到使用msgctl系统调用或iqcrm -q命令删除它为止。 ---- 系统提供了许多有关创建、使用和管理消息队列的系统调用,如: ---- int msgget(key,flag)——创建一个具有flag权限的MQ及其相应的结构,并返回一个唯一的正整数msqid(MQ的标识符); ---- int msgsnd(msqid,msgp,msgsz,msgtyp,flag)——向队列中发送信息; ---- int msgrcv(msqid,cmd,buf)——从队列中接收信息; ---- int msgctl(msqid,cmd,buf)——对MQ的控制操作; ---- (4) 共享存储段(SM) ---- 共享存储段是主存的一部分,它由一个或多个独立的进程共享。各进程的数据段与共享存储段相关联,对每个进程来说,共享存储段有不同的虚拟地址。系统提供的有关SM的系统调用有: ---- int shmget(key,size,flag)——创建大小为size的SM段,其相应的数据结构名为key,并返回共享内存区的标识符shmid; ---- char shmat(shmid,address,flag)——将当前进程数据段的地址赋给shmget所返回的名为shmid的SM段; ---- int shmdr(address)——从进程地址空间删除SM段; ---- int shmctl (shmid,cmd,buf)——对SM的控制操作; ---- SM的大小只受主存限制,SM段的访问及进程间的信息交换可以通过同步读写来完成。同步通常由信号灯来实现。SM非常适合进程之间大量数据的共享。 ---- (5) 信号灯 ---- 在UNIX中,信号灯是一组进程共享的数据结构,当几个进程竞争同一资源时(文件、共享内存或消息队列等),它们的操作便由信号灯来同步,以防止互相干扰。 ---- 信号灯保证了某一时刻只有一个进程访问某一临界资源,所有请求该资源的其它进程都将被挂起,一旦该资源得到释放,系统才允许其它进程访问该资源。信号灯通常配对使用,以便实现资源的加锁和解锁。 ---- 进程间通信的实现技术的特点是:操作系统提供实现机制和编程接口,由用户在程序中实现,保证进程间可以进行快速的信息交换和大量数据的共享。但是,上述方式主要适合在同一台计算机系统内部的进程之间的通信。 3 应用程序间的通信及其实现技术 ---- 同进程之间的相互制约一样,不同的应用程序之间也存在竞争和协作的关系。UNIX操作系统也提供一些可用于应用程序之间实现数据共享与信息交换的编程接口,程序员可以通过自己编程来实现。如远程过程调用和基于TCP/IP协议的套接字(Socket)编程。但是,相对普通程序员来说,它们涉及的技术比较深,编程也比较复杂,实现起来困难较大。 ---- 于是,一种新的技术应运而生——通过将有关通信的细节完全掩盖在某个独立软件内部,即底层的通讯工作和相应的维护管理工作由该软件内部来实现,用户只需要将通信任务提交给该软件去完成,而不必理会它的具体工作过程——这就是所谓的中间件技术。 ---- 我们在这里分别讨论这三种常用的应用程序间通信的实现技术——远程过程调用、会话编程技术和MQSeries消息队列技术。其中远程过程调用和会话编程属于比较低级的方式,程序员参与的程度较深,而MQSeries消息队列则属于比较高级的方式,即中间件方式,程序员参与的程度较浅。 ---- 4.1 远程过程调用(RPC)
-
MAX_LEN) {
int pivot = partition(arr, left, right);
quicksort_optimized(arr, left, pivot - 1);
quicksort_optimized(arr, pivot + 1, right);
} else {
// 使用插入排序处理小数组
}
}
```
- 合并相同值进行分割:在每次划分后,我们将与枢轴相等的元素聚集在一起,以降低后续迭代中的重复处理。例如:
原序列: 1 4 6 7 6 6 7 6 8 6
- 选取枢轴(6)并划分:1 4 6 7 1 6 7 6 8 6
- 划分结果(未处理相等项):1 4 6 6 7 6 7 6 8 6
- 处理相等项后的划分结果:1 4 6 6 6 6 7 8 7
- 下次划分得到的子序列:1 4 和 7 8 7
通过这样的优化,我们可以明显减少迭代次数,从而提高排序效率。">
改进版快速排序:针对部分有序列的策略与优化技巧" - 随机选枢轴:当数据部分有序时,传统快速排序通过固定枢轴可能导致效率低下。为此,我们采用随机选取枢轴的方法,代码如下: ```c int SelectPivotRandom(int arr[], int low, int high) { srand(time(0)); int pivotPos = (rand() % (high - low)) + low; swap(arr[pivotPos], arr[low]); return arr[low]; } ``` - 优化小数组交换:针对小且部分有序的数组,快速排序不如插入排序高效。因此,当待排序序列长度小于等于10时,我们会切换至插入排序: ```c #define MAX_LEN 10 void quicksort_optimized(int *arr, int left, int right) { int length = right - left; if (length > MAX_LEN) { int pivot = partition(arr, left, right); quicksort_optimized(arr, left, pivot - 1); quicksort_optimized(arr, pivot + 1, right); } else { // 使用插入排序处理小数组 } } ``` - 合并相同值进行分割:在每次划分后,我们将与枢轴相等的元素聚集在一起,以降低后续迭代中的重复处理。例如: 原序列: 1 4 6 7 6 6 7 6 8 6 - 选取枢轴(6)并划分:1 4 6 7 1 6 7 6 8 6 - 划分结果(未处理相等项):1 4 6 6 7 6 7 6 8 6 - 处理相等项后的划分结果:1 4 6 6 6 6 7 8 7 - 下次划分得到的子序列:1 4 和 7 8 7 通过这样的优化,我们可以明显减少迭代次数,从而提高排序效率。
-
在ASP.NET中使用MSMQ进行消息处理
-
【Netty】「萌新入门」(七)ByteBuf 的性能优化-堆内存的分配和释放都是由 Java 虚拟机自动管理的,这意味着它们可以快速地被分配和释放,但是也会产生一些开销。 直接内存需要手动分配和释放,因为它由操作系统管理,这使得分配和释放的速度更快,但是也需要更多的系统资源。 另外,直接内存可以映射到本地文件中,这对于需要频繁读写文件的应用程序非常有用。 此外,直接内存还可以避免在使用 NIO 进行网络传输时发生数据拷贝的情况。在使用传统的 I/O 时,数据必须先从文件或网络中读取到堆内存中,然后再从堆内存中复制到直接缓冲区中,最后再通过 SocketChannel 发送到网络中。而使用直接缓冲区时,数据可以直接从文件或网络中读取到直接缓冲区中,并且可以直接从直接缓冲区中发送到网络中,避免了不必要的数据拷贝和内存分配。 通过 ByteBufAllocator.DEFAULT.directBuffer 方法来创建基于直接内存的 ByteBuf: ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); 通过 ByteBufAllocator.DEFAULT.heapBuffer 方法来创建基于堆内存的 ByteBuf: ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); 注意: 直接内存是一种特殊的内存分配方式,可以通过在堆外申请内存来避免 JVM 堆内存的限制,从而提高读写性能和降低 GC 压力。但是,直接内存的创建和销毁代价昂贵,因此需要慎重使用。 此外,由于直接内存不受 JVM 垃圾回收的管理,我们需要主动释放这部分内存,否则会造成内存泄漏。通常情况下,可以使用 ByteBuffer.clear 方法来释放直接内存中的数据,或者使用 ByteBuffer.cleaner 方法来手动释放直接内存空间。 测试代码: public static void testCreateByteBuf { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16); System.out.println(buf.getClass); ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); System.out.println(heapBuf.getClass); ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); System.out.println(directBuf.getClass); } 运行结果: class io.netty.buffer.PooledUnsafeDirectByteBuf class io.netty.buffer.PooledUnsafeHeapByteBuf class io.netty.buffer.PooledUnsafeDirectByteBuf 池化技术 在 Netty 中,池化技术指的是通过对象池来重用已经创建的对象,从而避免了频繁地创建和销毁对象,这种技术可以提高系统的性能和可伸缩性。 通过设置 VM options,来决定池化功能是否开启: -Dio.netty.allocator.type={unpooled|pooled} 在 Netty 4.1 版本以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现; 这里我们使用非池化功能进行测试,依旧使用的是上面的测试代码 testCreateByteBuf,运行结果如下所示: class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf 可以看到,ByteBuf 类由 PooledUnsafeDirectByteBuf 变成了 UnpooledUnsafeDirectByteBuf; 在没有池化的情况下,每次使用都需要创建新的 ByteBuf 实例,这个操作会涉及到内存的分配和初始化,如果是直接内存则代价更为昂贵,而且频繁的内存分配也可能导致内存碎片问题,增加 GC 压力。 使用池化技术可以避免频繁内存分配带来的开销,并且重用池中的 ByteBuf 实例,减少了内存占用和内存碎片问题。另外,池化技术还可以采用类似 jemalloc 的内存分配算法,进一步提升分配效率。 在高并发环境下,池化技术的优点更加明显,因为内存的分配和释放都是比较耗时的操作,频繁的内存分配和释放会导致系统性能下降,甚至可能出现内存溢出的风险。使用池化技术可以将内存分配和释放的操作集中到预先分配的池中,从而有效地降低系统的内存开销和风险。 内存释放 当在 Netty 中使用 ByteBuf 来处理数据时,需要特别注意内存回收问题。 Netty 提供了不同类型的 ByteBuf 实现,包括堆内存(JVM 内存)实现 UnpooledHeapByteBuf 和堆外内存(直接内存)实现 UnpooledDirectByteBuf,以及池化技术实现的 PooledByteBuf 及其子类。 UnpooledHeapByteBuf:通过 Java 的垃圾回收机制来自动回收内存; UnpooledDirectByteBuf:由于 JVM 的垃圾回收机制无法管理这些内存,因此需要手动调用 release 方法来释放内存; PooledByteBuf:使用了池化机制,需要更复杂的规则来回收内存; 由于池化技术的特殊性质,释放 PooledByteBuf 对象所使用的内存并不是立即被回收的,而是被放入一个内存池中,待下次分配内存时再次使用。因此,释放 PooledByteBuf 对象的内存可能会延迟到后续的某个时间点。为了避免内存泄漏和占用过多内存,我们需要根据实际情况来设置池化技术的相关参数,以便及时回收内存; Netty 采用了引用计数法来控制 ByteBuf 对象的内存回收,在博文 「源码解析」ByteBuf 的引用计数机制 中将会通过解读源码的形式对 ByteBuf 的引用计数法进行深入理解; 每个 ByteBuf 对象被创建时,都会初始化为1,表示该对象的初始计数为1。 在使用 ByteBuf 对象过程中,如果当前 handler 已经使用完该对象,需要通过调用 release 方法将计数减1,当计数为0时,底层内存会被回收,该对象也就被销毁了。此时即使 ByteBuf 对象还在,其各个方法均无法正常使用。 但是,如果当前 handler 还需要继续使用该对象,可以通过调用 retain 方法将计数加1,这样即使其他 handler 已经调用了 release 方法,该对象的内存仍然不会被回收。这种机制可以有效地避免了内存泄漏和意外访问已经释放的内存的情况。 一般来说,应该尽可能地保证 retain 和 release 方法成对出现,以确保计数正确。