欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

深入解析C#中的MSMQ消息队列

最编程 2024-08-06 17:50:55
...

一、引言

        Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版本管理的序列化功能,并提供了与消息队列(MSMQ)、COM+、Asp.net Web服务、.NET Remoting等微软现有的分布式系统技术。利用WCF平台,开发人员可以很方便地构建面向服务的应用程序(SOA)。WCF是对之前现有的分布式技术(MSMQ、.NET Remoting和Web 服务等技术)的集成和扩展,既然这样,我们就有必要首先了解下之前分布式技术,只有这样才能更深刻地明白WCF所带来的好处。今天就分享下MSMQ这种分布式技术。

二、MSMQ介绍

        MSMQ全称是Microsoft Message Queue——微软消息队列。它是一种异步传输模式,可以在不同的应用之间实现相互通信,相互通信的应用可以分布在同一台机器上,也可以分布于相连的网络空间中的任一位置。

2.1 MSMQ 工作原理

        MSMQ的实现原理是:消息的发送者把自己想要发送的信息放入一个容器(Message),然后把它保存到一个系统公用空间的消息队列(Message Queue)中,本地或异地的消息接收程序再从该队列中取出发给它的消息进行处理。

        消息队列是一个公用存储空间,它可以存在于内存中或物理文件中,因此,消息以两种方式发送,即快递方式和可恢复模式。它们的区别是消息存储位置的不同,快递方式,为了消息的快速传递,所以把消息放置在内存中,而不放在物理磁盘上,以获得较高的处理能力;而可恢复模式在传送过程的每一步骤中,都把消息写入物理磁盘上,这样当保存消息队列的机器发生故障而重新启动后,可以把发送的消息恢复到故障发送之前的状态,以获得更好的消息恢复能力。消息队列可以放在发送方、接收方所在的机器上,也可以单独放置在另外一台机器上。另外,采用消息队列机制,发送方不必要担心接收方是否启动,是否发生故障等因素,只要消息成功发送出去,就可以认为处理完成,而实际上对方可能甚至未开机,或者实际消息传递到对方可能在第二天。MSMQ机制类似QQ消息传递机制。下图演示了MSMQ的实现原理。

MSMQ中主要有两个概念:

(1)一个是消息Message:Message是通信双方需要传递的消息,它可以是文本、图片、视频等。消息包含发送和接收者的标识,只有指定的用户才能取得消息。

(2)一个是队列Queue:用来保存消息的存储空间,MSMQ中主要包括以下几种队列类:

  • 公共队列:在整个消息队列网络中复制,有可能由网络连接的所有站点访问。路径格式为:机器名称\队列名称
  • 专用队列(或叫私有队列):不在整个网络中发布,它们仅在所驻留的本地计算机上可用,专用队列只能由知道队列的完整路径名称或标签的应用程序访问。路径格式为:机器名称\Private$\队列名称
  • 日志队列:包含确认在给定“消息队列中发送的消息回执消息”。路径格式为:机器名称\队列名称\Journal$

       响应队列:包含目标应用程序接收到消息时返回给发送应用程序的响应消息,包括机器日志队列、机器死信队列和机器事务死信队列。其中,机器信道死信队列对应的格式为:机器名称\XactDeadletter$。机器死信队列对应的格式为:机器名称\Deadletter$;机器日志队列对应的格式为:机器名称\Journal$;

2.2 队列引用说明

当创建了一个MessageQueue实例之后,就应指明和哪个队列进行通信,在.NET中有3种访问指定消息队列的方法:

  • 使用路径,消息队列的路径被机器名和队列名唯一确定,所以可以用消息队列路径来指明使用的消息队列。
  • 使用格式名(format name),它是由MSMQ在消息队列创建时生成的唯一标识,个使命不由用户指定,而是由队列管理者自动生成的GUID。
  • 使用标识名(label),它是消息队列创建时由消息管理者指定的带有意义的名字。

三、消息队列的优缺点

        采用消息队列的好处是:由于是异步通信,无论是发送方还是接收方都不同等待对方返回成功消息,就可以执行余下的代码,大大提高了处理的能力;在信息传递过程中,具有故障恢复能力;MSMQ的消息传递机制使得通信的双方具有不同的物理平台成为可能。

消息队列缺点是:不适合Client需要Server端实时交互情况,大量请求时候,响应可能延迟

四、MSMQ封装的System.Messaging类

注:研究System.Messaging类中封装的属性(Property)和方法(Method)

4.1 Message类

Message是MSMQ的数据存储单元,我们的用户数据一般也被填充在Message的body当中。

1、设置发送优先级---属性

例如:

msg.Priority = MessagePriority.Highest;

4.2 XmlMessageFormatter、BinaryMessageFormatter和BinaryMessageFormatter类

        消息序列化可以通过.NET Framework 附带的三个预定义格式化程序来完成,但是由于后两者格式化后的信息通常不能为人阅读,所以经常用到的是XMLMessageFormatter对象,如下所示:

1、用来格式化消息

例如:

// 设置消息队列的格式化器
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(String) });
//mq.Formatter = new XmlMessageFormatter(new string[] { "System.String" });

4.3 MessageQueue类

        消息(Message)需要保存在msmq队列中,.net中采用System.Messaging.MessageQueue来管理MSMQ队列,它提供能操作MSMQ的绝大多数API,比如:

1、判断指定路径的队列是否存在

        其中path代表队列的路径,表示形式为"主机名\队列名称",例如:".\private$\myQueue",其中"."代表本地主机,"\private$\myQueue"则代表队列的名称,"private$"表示我们创建的是专用队列,在网络上我们可以通过路径来唯一确定一个队列。

例如:

if (MessageQueue.Exists(@".\Private$\TestMSMQ"))
{
    //todo
}

2、创建队列

        path代表队列的路径,transactional表示是否创建事务队列,默认为fasle。事务性消息,可以确保事务中的消息按照顺序传送,只传送一次,并且从目的队列成功的被检索。

例如:事务队列

            /// <summary>
            /// 通过Create方法创建使用指定路径的新消息队列
            /// </summary>
            /// <param name="queuePath"></param>
            public static MessageQueue Createqueue(string queuePath)
            {
                try
                {
                    if (!MessageQueue.Exists(queuePath))
                    {
                        //Create方法
                        //MessageQueue myTranMessage = MessageQueue.Create(@".\private$\TestQueue");

                        //创建事务性的专用消息队列
                        MessageQueue myTranMessage =MessageQueue.Create(@".\private$\TestQueueTrans", true);

                        //创建远程服务器连接的消息队列,这里注意写法
                        //MessageQueue myTranMessage =MessageQueue.Create(@"FormatName:Direct=TCP:192.168.22.232\private$\TestQueue");
                    }
                    else { }
                }
                catch (MessageQueueException e) { }
            }

3、删除队列

例如:

// 删除消息队列
MessageQueue.Delete(queuePath);

4、发送消息到MSMQ

        obj代表我们的用户数据,transation表示将我们的发送操作纳入事务当中。在前面我们说过MSMQ接收的是Message,但是在这里我们看到Send操作并未强制要求我们采用Message类型参数。这是因为当我传入一个Object参数数据时,在Send操作的内部自动的给我们创建了一个Message消息对象,并且将我们的传入的Object参数采用默认的序列化器序列化,然后装入Message的body属性当中,如果我们在Send方法中指定label属性,它将被赋值给Message的同名Label属性。当然我们完全可以自定义一个message对象传入Send方法中。

例如:简单数据发送

mq.send(1000); //发送整型数据 
mq.send("this is a test message!"); //发送字符串

例如:复杂数据发送

Customer customer = new Customer();
customer.FirstName = "copernicus";
customer.LastName = "nicolaus";
mq.send(customer);

例如:示例

/// <summary>
/// 连接消息队列并发送消息到队列
/// </summary>
public static void SendMessage()
{
    try
    {
        //连接到本地的队列
        MessageQueue myQueue = new MessageQueue(".\\private$\\TestQueue");
        Message myMessage = new Message();
        myMessage.Body = "消息内容";
       //设置最高消息优先级
       message.Priority = MessagePriority.Highest;
       //序列化为字符串——注:也可以使用自定义的序列化方法
        myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
       //发送消息到队列中
       myQueue.Send(myMessage);
        //事务性消息需加上下面几句
        //MessageQueueTransaction myTransaction = new MessageQueueTransaction();
        //启动事务 myTransaction.Begin();
        //myQueue.Send(myMessage, myTransaction);  //加了事务
        //提交事务 myTransaction.Commit();
    }
    catch (ArgumentException e){}
}

5、接收消息

MessageQueue提供了两个方法来接收消息:

(1)Peek方法:从队列中取出消息而不从队列中移除该消息

(2)Receive方法:接收消息同时永久性地从队列中删除消息

例如:

如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。

/// <summary>
/// 连接消息队列并从队列中接收消息
/// </summary>
public static void ReceiveMessage()
{
    //连接到本地队列
    MessageQueue myQueue = new MessageQueue(".\\private$\\TestQueue");
    //注:由于消息的优先级是枚举类型,在直接messages[index].Priority.ToString();这种方式来获取优先级转化到字符串的时候,他需要一个过滤器(Filter),否则会抛出一个InvalidCastExceptionle类型的异常,异常信息"接收消息时未检索到属性 Priority。请确保正确设置了 PropertyFilter。",要解决这问题只需要把消息对象的MessageReadPropertyFilter(过滤器) 的Priority设置为true。
    myQueue.MessageReadPropertyFilter.Priority = true;
    //接收端必须反序列化
    myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
    try
    {    
        //从队列中接收消息
        Message myMessage = myQueue.Receive();
        string context = (string)myMessage.Body; //获取消息的内容
        string priority = messages[index].Priority.ToString();
    }
    catch (MessageQueueException e){}
    catch (InvalidCastException e){}
}

6、异步消息处理

(1)异步发送消息

        /// <summary> 
        /// 发送消息到队列 
        /// </summary> 
        private static void SendMessage()
        {
            MessageQueue myQueue = new MessageQueue(".\\private$\\TestAsyncQueue");
            if (myQueue.Transactional)
            {
                Test test = new Test();
                test.Name = "XiaoMing";
                test.Sex = "男";
                Message message = new Message(test);
                message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

                MessageQueueTransaction myTransaction = new MessageQueueTransaction();
                myTransaction.Begin();
                myQueue.Send(message, myTransaction);
                myTransaction.Commit();
                Console.WriteLine("消息成功发送到队列!");
            }
        }

(2)异步接收消息

         /// <summary> 
         /// 异步接收消息 
         /// </summary> 
         private static void AsyncReceiveMessage()
         { 
             MessageQueue myQueue = new MessageQueue(".\\private$\\TestAsyncQueue"); 
             if (myQueue.Transactional) 
             { 
                 MessageQueueTransaction myTransaction = new MessageQueueTransaction(); 
                //这里使用了委托,当接收消息完成的时候就执行MyReceiveCompleted方法 
                myQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted); 
                myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string)}); 
                myTransaction.Begin(); 
                myQueue.BeginReceive();//启动一个没有超时时限的异步操作 
                myTransaction.Commit(); 
            } 
        }

        private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
        {
            try
            {
                MessageQueue myQueue = (MessageQueue)source;
                //完成指定的异步接收操作 
                Message message = myQueue.EndReceive(asyncResult.AsyncResult);
                Test test = message.Body as Test;
                Console.WriteLine("姓名:{0}--性别:{1}", test.Name.ToString(), test.Sex);
                myQueue.BeginReceive();
            }
            catch (MessageQueueException me)
            {
                Console.WriteLine("异步接收出错,原因:" + me.Message);
            }
        }

 

参考文章:

(1)https://www.cnblogs.com/jx270/p/4943199.html

(2)https://blog.****.net/kisscatforever/article/details/63264247

(3)【线程】https://www.cnblogs.com/beimeng/p/3298190.html

(4)【线程池】https://www.cnblogs.com/tuyile006/p/6894517.html