好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Windows Azure: Service Bus Brokered Messaging Dead

Windows Azure: Service Bus Brokered Messaging DeadLetterQueue 使用详解

什么是DeadLetterQueue

DeadLetterQueue的概念不仅仅存在于Service Bus Brokered Messaging 服务中,在各种平台的消息队列中间件中,都包含DeadLetterQueue的概念。DeadLetterQueue是在正常的消息队列之外,由系统生成的一个子队列,用来存储“有问题”的消息。这个“有问题”的消息,可能是指发送端发送失败的消息,或者是接收方处理失败的消息,或者是超期未处理的消息。

DeadLetterQueue in Service Bus Brokered Messaging

在Service Bus Brokered Messaging中,每一个Queue与Subscription都包含一个子队列用以实现DeadLetterQueue,这个子队列由Service Bus系统生成,如图所示:

在以下几种情况中,Message将会被移入DeadLetterQueue:

消息接收方,在消息被Complete和Abandon之前,显式调用Message的DeadLetter方法,可将消息移入DeadLetter队列; Queue或Subscription的EnableDeadLetteringOnMessageExpiration设置为true, 当消息过期以后,将自动被移入DeadLetter队列; Subscription的EnableDeadLetteringOnFilterEvaluationExceptions属性设置为true,当消息过滤出现异常时,将自动被移入DeadLetter队列; 当消息的接收次数超过MaxDeliveryCount设置的值时,将自动被移入DeadLetter队列;

显式调用DeadLetter

什么样的情况需要显示调用DeadLetter方法将消息移入DeadLetter队列呢?我考虑了一个场景,假设接收方是一个订单处理系统,发送方将订单信息包含在消息中进行发送,但是这个接收方只能处理固定区域的订单,对于其他区域的订单无法处理,这时,如果接收到其他区域的订单,则可以暂不处理并将其移入DeadLetter队列,等正常的消息处理完成以后,再来对DeadLetter队列中的订单消息做一些特殊处理。

发送消息:

    1:   string  queueName =  "MyQueue" ;
    2:  NamespaceManager namespaceClient = NamespaceManager.Create();
    3:   if  (namespaceClient.QueueExists(queueName))
    4:  {
    5:      namespaceClient.DeleteQueue(queueName);
    6:  }
    7:  QueueDescription queueDescription =  new  QueueDescription(queueName);
    8:  namespaceClient.CreateQueue(queueDescription);
    9:   
   10:  MessagingFactory factory = MessagingFactory.Create();
   11:  QueueClient queueClient = factory.CreateQueueClient(queueName);
   12:  CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "Beijing" , queueClient);
   13:  CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "Dalian" , queueClient);
   14:  CreateAndSendOrderMessage(Guid.NewGuid().ToString(),  "Guangzhou" , queueClient);
   15:   
   16:  Console.WriteLine();
   17:  Console.WriteLine( "Press [Enter] to delete queue and exit." );
   18:  Console.ReadLine();
   19:  namespaceClient.DeleteQueue(queueName);
   20:  factory.Close();
   21:  queueClient.Close();

    1:   private   static   void  CreateAndSendOrderMessage( string  orderId,  string  orderRegion, QueueClient sender)
    2:  {
    3:      var message =  new  BrokeredMessage();
    4:      message.Properties.Add( "OrderId" , orderId);
    5:      message.Properties.Add( "OrderRegion" , orderRegion);
    6:      Console.WriteLine( "Sending message of order region {0}." , message.Properties[ "OrderRegion" ]);
    7:      sender.Send(message);
    8:  }

接收消息:

    1:   string  queueName =  "MyQueue" ;
    2:  MessagingFactory factory = MessagingFactory.Create();
    3:  QueueClient queueClient = factory.CreateQueueClient(queueName, ReceiveMode.PeekLock);
    4:  Console.WriteLine( "Reading messages from queue..." );
    5:  BrokeredMessage message =  null ;
    6:   while  ((message = queueClient.Receive(TimeSpan.FromSeconds(2))) !=  null )
    7:  {
    8:       string  orderRegion = message.Properties[ "OrderRegion" ].ToString();
    9:       if  (validatedRegion.Contains(orderRegion))
   10:      {
   11:          Console.WriteLine( "Process the message whose order region is {0}" , orderRegion);
   12:          ProcessMessage(message);
   13:          message.Complete();
   14:      }
   15:       else 
   16:      {
   17:          Console.WriteLine( "DeadLetter the message whose order region is {0}" , orderRegion);
   18:          message.DeadLetter( "Invalid Region" ,  string .Format( "The order region {0} is invalid" , orderRegion));
   19:      }
   20:      message.Dispose();
   21:  }
   22:  queueClient.Close();

处理DeadLetter队列中的消息:

    1:   string  deadLetterQueuePath = QueueClient.FormatDeadLetterPath(queueName);
    2:  QueueClient deadletterQueueClient = factory.CreateQueueClient(deadLetterQueuePath, ReceiveMode.PeekLock);
    3:  Console.WriteLine( "Reading messages from deadletter queue..." );
    4:   
    5:  BrokeredMessage messageInDeadletterQueue =  null ;
    6:   while  ((messageInDeadletterQueue = deadletterQueueClient.Receive(TimeSpan.FromSeconds(2))) !=  null )
    7:  {
    8:      LogDeadletterMessage(messageInDeadletterQueue);
    9:      messageInDeadletterQueue.Complete();
   10:      messageInDeadletterQueue.Dispose();
   11:  }
   12:   
   13:  deadletterQueueClient.Close();

过期消息

假设我们在创建队列或者Subscription时,将默认的消息存活时间设置为5s(当然也可以给每个消息单独设置TimeToLive),并且将EnableDeadLetteringOnMessageExpiration设置为true,如果为false,则过期消息将被直接删除。

    1:   string  queueName =  "MyQueue" ;
    2:  NamespaceManager namespaceClient = NamespaceManager.Create();
    3:   if  (namespaceClient.QueueExists(queueName))
    4:  {
    5:      namespaceClient.DeleteQueue(queueName);
    6:  }
    7:  QueueDescription queueDescription =  new  QueueDescription(queueName)
    8:  {
    9:      DefaultMessageTimeToLive = TimeSpan.FromSeconds(5),
   10:      EnableDeadLetteringOnMessageExpiration =  true 
   11:  };
   12:  namespaceClient.CreateQueue(queueDescription);

那在消息发送以后,等5s我们再去正常接收消息,会发现没有任何消息,消息因超期被移入了DeadLetter队列中,接收消息代码同显示调用DeadLetter。测试结果如图所示:

消息发送:

消息接收:

接收次数超过MaxDeliveryCount值

我们将Queue或Subscription的MaxDeliveryCount属性设置为2,那么一旦消息被接收的次数到达2次以后将会被移入DeadLetter队列。

    1:   string  queueName =  "MyQueue" ;
    2:  NamespaceManager namespaceClient = NamespaceManager.Create();
    3:   if  (namespaceClient.QueueExists(queueName))
    4:  {
    5:      namespaceClient.DeleteQueue(queueName);
    6:  }
    7:  QueueDescription queueDescription =  new  QueueDescription(queueName)
    8:  {
    9:      MaxDeliveryCount = 2
   10:  };
   11:  namespaceClient.CreateQueue(queueDescription);

接收消息:

    1:   string  queueName =  "MyQueue" ;
    2:  MessagingFactory factory = MessagingFactory.Create();
    3:  QueueClient queueClient = factory.CreateQueueClient(queueName, ReceiveMode.PeekLock);
    4:  Console.WriteLine( "Reading messages from queue..." );
    5:  BrokeredMessage message =  null ;
    6:   while  ((message = queueClient.Receive(TimeSpan.FromSeconds(2))) !=  null )
    7:  {
    8:      Console.WriteLine( "Order Id:{0}" , message.Properties[ "OrderId" ]);
    9:      message.Abandon();
   10:      message.Dispose();
   11:  }
   12:  queueClient.Close();

从DeadLette队列中收取消息代码同上。

测试结果:

从图中我们可以看到,总共3个订单消息,每个消息被接收了两次,然后从DeadLetter队列中就能收取到这3条消息。

Filter Evaluation Exception

快崩溃了,一直没能模拟出Filter Evaluation Exception,所以无法为大家提供例证。后期我再补全并上传代码。哪位好心朋友如果清楚Filter Evaluation Exception如何模拟请给我留言,谢谢。

 

 

分类:  Windows Azure

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于Windows Azure: Service Bus Brokered Messaging Dead的详细内容...

  阅读:43次

上一篇: C#截图

下一篇:权限系统