好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

C#实现同Active MQ通讯的方法

本文实例讲述了C#实现同Active MQ通讯的方法。分享给大家供大家参考,具体如下:

内容概要:

主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

正文:

JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。
Message 由消息头,属性和消息体三部份组成。
Active MQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

示例代码:

?

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Windows;

using System.Windows.Controls;

using System.Windows.Data;

using System.Windows.Documents;

using System.Windows.Input;

using System.Windows.Media;

using System.Windows.Media.Imaging;

using System.Windows.Navigation;

using System.Windows.Shapes;

using Apache.NMS;

using System.Diagnostics;

using Apache.NMS.Util;

using System.Windows.Threading;

/*

  * 功能描述:C#使用ActiveMQ示例

  * 修改次数:2

  * 最后更新: by Kagula,2012-07-31

  *

  * 前提条件:

  * [1]apache-activemq-5.4.2

  * [2]Apache.NMS.ActiveMQ-1.5.6-bin

  * [3]WinXP SP3

  * [4]VS2008 SP1

  * [5]WPF工程 With .NET Framework 3.5

  *

  * 启动

  *

  * 不带安全控制方式启动

  * [你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat

  *

  * 安全方式启动

  * 添加环境变量:      ACTIVEMQ_ENCRYPTION_PASSWORD=activemq

  * [你的解压路径]\apache-activemq-5.4.2\bin>activemq xbean:file:conf/activemq-security.xml

  *

  * Active MQ 管理地址

  * http://127.0.0.1:8161/admin/

  * 添加访问"http://127.0.0.1:8161/admin/"的限制

  *

  * 第一步:添加访问限制

  * 修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件

  * 下面这行编码,原

  * <property name="authenticate" value="true" />

  * 修改为

  * <property name="authenticate" value="false" />

  *

  * 第二步:修改登录用户名密码,缺省分别为admin,admin

  * D:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties

  *

  * 用户管理(前提:以安全方式启动ActiveMQ)

  *

  * 在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码

  * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名

  * e.g. 添加oa用户,密码同用户名。

  * <authenticationUser username="oa" password="oa" groups="users,admins"/>

  *

  * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的Topic或Queue

  * 只能被哪些用户组read 或 write。

  *

  *

  * 配置C# with WPF项目

  * 项目的[Application]->[TargetFramework]属性设置为[.NETFramework 3.5](这是VS2008WPF工程的默认设置)

  * 添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用

  * Apache.NMS.dll相当于接口

  *

  * 如果是以Debug方式调试

  * 把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\debug\目录下的

  * Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下

  * Apache.NMS.ActiveMQ.dll相当于实现

  *

  * 如果是以Release方式调试

  * 参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。

  *

  *

  * 参考资料

  * [1]《C#调用ActiveMQ官方示例》 http://activemq.apache.org/nms/examples.html

  * [2]《ActiveMQ NMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html

  * [3]《Active MQ在C#中的应用示例》 http://HdhCmsTestzzvips测试数据/article/87956.htm

  * [4]《NMS API Reference》http://activemq.apache.org/nms/nms-api.html

  */

namespace testActiveMQSubscriber

{

   /// <summary>

   /// Interaction logic for Window1.xaml

   /// </summary>

   public partial class Window1 : Window

   {

     private static IConnectionFactory connFac;

     private static IConnection connection;

     private static ISession session;

     private static IDestination destination;

     private static IMessageProducer producer;

     private static IMessageConsumer consumer;

     protected static ITextMessage message = null ;

     public Window1()

     {

       InitializeComponent();

       initAMQ( "MyFirstTopic" );

     }

     private void initAMQ(String strTopicName)

     {

       try

       {

         connFac = new NMSConnectionFactory( new Uri( "activemq:failover:(tcp://localhost:61616)" ));

         //新建连接

         //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码

         //如果你要持久[订阅],则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!

         connection.ClientId = "testing listener" ;

         connection = connFac.CreateConnection(); //如果你是缺省方式启动Active MQ服务,则不需填用户名、密码

         //创建Session

         session = connection.CreateSession();

         //发布/订阅模式,适合一对多的情况

         destination = SessionUtil.GetDestination(session, "topic://" + strTopicName);

         //新建生产者对象

         producer = session.CreateProducer(destination);

         producer.DeliveryMode = MsgDeliveryMode.NonPersistent; //ActiveMQ服务器停止工作后,消息不再保留

         //新建消费者对象:普通[订阅]模式

         //consumer = session.CreateConsumer(destination);//不需要持久[订阅]

         //新建消费者对象:持久"订阅"模式:

         //  持久[订阅]后,如果你的程序被停止工作后,恢复运行,

         //从第一次持久订阅开始,没收到的消息还可以继续收

         consumer = session.CreateDurableConsumer(

           session.GetTopic(strTopicName)

           , connection.ClientId, null , false );

         //设置消息接收事件

         consumer.Listener += new MessageListener(OnMessage);

         //启动来自Active MQ的消息侦听

         connection.Start();

       }

       catch (Exception e)

       {

         //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!

         Debug.WriteLine(e.Message);

       }

     }

     private void SendMsg2Topic_Click( object sender, RoutedEventArgs e)

     {

       //发送消息

       ITextMessage request = session.CreateTextMessage(DateTime.Now.ToLocalTime()+ " " +tbMsg.Text);

       producer.Send(request);

     }

     protected void OnMessage(IMessage receivedMsg)

     {

       //接收消息

       message = receivedMsg as ITextMessage;

       //UI线程,显示收到的消息

       Dispatcher.Invoke(DispatcherPriority.Normal, new Action(() =>

       {

         DateTime dt = new DateTime();

         ListBoxItem lbi = new ListBoxItem();

         lbi.Content = DateTime.Now.ToLocalTime() + " " + message.Text;

         lbR.Items.Add(lbi);

       }));

     }

   }

}

队列通讯方式,消费者例子

?

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using Apache.NMS;

using System.Diagnostics;

using log4net;

using Apache.NMS.Util;

using System.Collections;

namespace Cat8637AutoCallServer

{

   public class SMTask

   {

     public String Callee { get ; set ; }

     public String CheckNumber { get ; set ; }

     public int Deadline { get ; set ; }

     public override String ToString()

     {

       return String.Format( "Callee={0},CheckNumber={1},Deadline={2}" ,

         Callee,CheckNumber,Deadline);

     }

   }

   /*

    * 负责接收任务,并把任务放在任务等待队列中。

    */

   public class MQClient

   {

     private static readonly ILog logger = LogManager.GetLogger( typeof (MQClient));

     private static IConnection connection = null ;

     private static ISession session = null ;

     Queue _voiceSMTasks = new Queue();

     public MQClient()

     {

       try

       {

         IConnectionFactory factory = new NMSConnectionFactory( new Uri( "activemq:failover:(tcp://localhost:61616)" ));

         //新建连接

         //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码

         connection = factory.CreateConnection();

         session = connection.CreateSession();

         IMessageConsumer consumer = session.CreateConsumer(session.GetQueue( "TaskIssue_VoiceSM" ));

         consumer.Listener += new MessageListener(OnMessage);

         connection.Start();

       }

       catch (Exception ex)

       {

         Debug.WriteLine(ex.Message);

       }

     }

     protected void OnMessage(IMessage receivedMsg)

     {

       IMessage message = receivedMsg as ITextMessage;

       SMTask smTask = new SMTask();

       smTask.Callee = message.Properties[ "Callee" ] as String;

       smTask.CheckNumber = message.Properties[ "Message" ] as String;

       smTask.Deadline = Convert.ToInt32(message.Properties[ "deadline" ] as String);

       logger.Info( "Received: " +smTask.ToString());

       lock (_voiceSMTasks)

       {

         _voiceSMTasks.Enqueue(smTask);

       }

     }

     public SMTask GetVoiceSMTask()

     {

       SMTask result = null ;

       lock (_voiceSMTasks)

       {

         if (_voiceSMTasks.Count > 0)

         {

           result = _voiceSMTasks.Dequeue() as SMTask;

         }

       }

       return result;

     }

   }

}

队列通讯方式,生产者例子

?

private void Send_Click( object sender, RoutedEventArgs e)

{

   try

   {

     IDestination destination = SessionUtil.GetDestination(session, "queue://TaskIssue_VoiceSM" );

     //新建生产者对象

     IMessageProducer producer = session.CreateProducer(destination);

     producer.DeliveryMode = MsgDeliveryMode.NonPersistent; //ActiveMQ服务器停止工作后,消息不再保留

     ITextMessage request = session.CreateTextMessage();

     request.NMSCorrelationID = "TestVoiceSM" ; //这里我填了应用程序的名称。

     request.Properties[ "Callee" ] = tbCallee.Text;

     request.Properties[ "Message" ] = tbCheckNumber.Text;

     request.Properties[ "deadline" ] = tbValidDuration.Text;

     producer.Send(request);

   }

   catch (Exception ex)

   {

     //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!

     Debug.WriteLine(ex.Message);

   }

}

private void Window_Closed( object sender, EventArgs e)

{

   try

   {

     if (session == null )

       return ;

     //if (connection == null)

     //  return;

     session.Close();

     //connection.Close();

   }

   catch (Exception ex)

   {

     Debug.WriteLine(ex.Message);

   }

}

希望本文所述对大家C#程序设计有所帮助。

dy("nrwz");

查看更多关于C#实现同Active MQ通讯的方法的详细内容...

  阅读:41次