Windows消息队列MQ的使用

2021-04-23
Windows消息队列MQ的使用

在系统间的进行数据传递,特别是数据量较大,分别插入不同的数据库的时候。


同时保证系统效率和响应时间,减少数据库负担的时候,使用消息队列是非常有帮助的。


代码:(把下面的代码建为基类直接调用就可以了)



using System.Messaging;//头文件




using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Messaging;



namespace _157

{

    public class MessageService 

    {

        #region Service 

        #endregion



        #region 构造函数

        public MessageService()

        {

        }

     

        #endregion



        public bool SendMessageByLocalPrivate(string messageQueueName, object message)

        {

            var messagePath = string.Format(@".\private$\{0}", messageQueueName);

            MessageQueue mq = null;

            try

            {

                mq = !MessageQueue.Exists(messagePath) ? MessageQueue.Create(messagePath) : new MessageQueue(messagePath);

                mq.Formatter = new BinaryMessageFormatter();

                mq.Send(message);

                return true;

            }

            catch (Exception ex)

            {               

                return false;

            }

            finally

            {

                if (mq != null)

                {

                    mq.Close();

                    mq.Dispose();

                }

            }

        }

        /// <summary>

        /// 远程发送消息

        /// </summary>

        /// <param name="ipAddress">IP地址</param>

        /// <param name="messageQueueName">消息队列名称</param>

        /// <param name="message">发送消息</param>

        /// <returns></returns>

        public bool SendMessageByDomainPrivate(string ipAddress, string messageQueueName, object message)

        {

            var messagePath = string.Format(@"FormatName:DIRECT=TCP:{0}\private$\{1}", ipAddress, messageQueueName);

            MessageQueue mq = null;

            try

            {

                mq = new MessageQueue(messagePath) { Formatter = new BinaryMessageFormatter() };

                mq.Send(message);

                return true;

            }

            catch (Exception ex)

            {

                return false;

            }

            finally

            {

                if (mq != null)

                {

                    mq.Close();

                    mq.Dispose();

                }

            }

        }



        public object GetMessageByLocalPrivate(string messageQueueName)

        {

            var messagePath = string.Format(@".\private$\{0}", messageQueueName);

            MessageQueue mq = null;

            Message msg = null;

            try

            {

                mq = !MessageQueue.Exists(messagePath) ? MessageQueue.Create(messagePath) : new MessageQueue(messagePath);

                mq.Formatter = new BinaryMessageFormatter();

                msg = mq.Receive();

                return msg == null ? null : msg.Body;

            }

            catch (Exception ex)

            {              

                return null;

            }

            finally

            {

                if (mq != null)

                    mq.Close();

                if (msg != null)

                    msg.Dispose();

                if (mq != null)

                    mq.Dispose();

            }

        }



        public object GetMessageByLocalPrivate(string messageQueueName, TimeSpan timeSpan)

        {

            var messagePath = string.Format(@".\private$\{0}", messageQueueName);

            MessageQueue mq = null;

            Message msg = null;

            try

            {

                mq = !MessageQueue.Exists(messagePath) ? MessageQueue.Create(messagePath) : new MessageQueue(messagePath);

                mq.Formatter = new BinaryMessageFormatter();

                msg = mq.Receive(timeSpan);

                return msg == null ? null : msg.Body;

            }

            catch (Exception ex)

            {

                if (ex.Message.Equals("请求操作的超时时间已到。") || ex.Message.Equals("Timeout for the requested operation has expired."))

                    return null;

                return null;

            }

            finally

            {

                if (mq != null)

                    mq.Close();

                if (msg != null)

                    msg.Dispose();

                if (mq != null)

                    mq.Dispose();

            }

        }

        /// <summary>

        /// 远程接收消息

        /// </summary>

        /// <param name="ipAddress">IP地址</param>

        /// <param name="messageQueueName">消息队列名称</param>

        /// <param name="timeSpan">间隔时间(毫秒)</param>

        /// <returns></returns>

        public object GetMessageByDomainPrivate(string ipAddress, string messageQueueName, TimeSpan timeSpan)

        {

            var messagePath = string.Format(@"FormatName:DIRECT=TCP:{0}\private$\{1}", ipAddress, messageQueueName);

            MessageQueue mq = null;

            Message msg = null;

            try

            {

                mq = new MessageQueue(messagePath) { Formatter = new BinaryMessageFormatter() };

                msg = mq.Receive(timeSpan);

                return msg == null ? null : msg.Body;

            }

            catch (Exception ex)

            {

                if (ex.Message.Equals("请求操作的超时时间已到。") || ex.Message.Equals("Timeout for the requested operation has expired."))

                    return null;

                return null;

            }

            finally

            {

                if (mq != null)

                    mq.Close();

                if (msg != null)

                    msg.Dispose();

                if (mq != null)

                    mq.Dispose();

            }

        }

    }

}