好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SQL2008使用SSB与数据库异步通信

SQL2008使用SSB与数据库异步通信

我们已经将数据从xls,csv,txt等文件中的解析出来,进行必须的数据验证, 然后将正确的数据以XML格式保存到磁盘,并将事务型数据更新到DB, 比如生成的磁盘文件名,CheckResult,CheckMemo等。 
我们再一起回顾下数据库表设计:


USE  SSB3
GO

CREATE   TABLE   [ BatchInventoryQueue ]
(
TransactionNumber  INT   IDENTITY ( 1 , 1 )  NOT   NULL ,
BatchFileID  INT   NOT   NULL ,
RowIndex  INT   NOT   NULL ,
ItemNumber  INT   NOT   NULL ,
[ FileName ]   NVARCHAR ( 256 )  NULL ,
HasCheck  CHAR ( 1 )  NULL ,
CheckResult  CHAR ( 1 )  NULL ,
CheckMemo  NVARCHAR ( 2000 )  NULL ,
HasSendSSB  CHAR ( 1 )  NULL ,  -- 是否尝试发送过SSB
SSBSendResult  CHAR ( 1 )  NULL , -- 发送SSB是否成功
SSBMemo  NVARCHAR ( 2000 )  NULL , -- SSB处理结果
CONSTRAINT  PK_TransactionNumber_BatchInventoryQueue  PRIMARY   KEY   CLUSTERED
(
    TransactionNumber  ASC
)
)

下面我们要将正确的数据以XML格式发送到数据库中。因为我们面临的数据量非常大, 
所以我们需要考虑负载均衡,比如多台服务器部署。那么就可能会面临数据冲突的问题。 
我这里的情况是要求多台服务器部署,那么如何给服务器分数据,又不造成冲突呢? 
方案一:用表的主键TransactionNumber与服务器数据取模


1   SELECT   TOP ( @BatchSize ) TransactionNumber,
2        [ FileName ]
3   FROM  dbo.BatchInventoryQueue
4   WHERE  HasCheck = ' Y '   AND  CheckResult = ' S '
5   AND   [ FileName ]   IS   NOT   NULL
6   AND  HasSendSSB  IS   NULL   AND  TransactionNumber % @Throtting = @TrottingMod

优点:实现简单。 
缺点:如果其中一台服务器失败,那么应该属于它处理的数据将一直得不到处理。 
方案二:结合SQL Server的锁特性,在查询数据时避免冲突如下:


1   UPDATE   TOP ( @BatchSize ) dbo.BatchInventoryQueue
2   SET  HasSendSSB = ' I ' -- inprocessing
3   OUTPUT DELETED.TransactionNumber,
4       DELETED. [ FileName ]
5   WHERE  HasCheck = ' Y '   AND  CheckResult = ' S '
6   AND   [ FileName ]   IS   NOT   NULL
7   AND  HasSendSSB  IS   NULL

将HasSendSSB更新为I,标示正在发送。由于在更新数据时,进程会获取UPDLOCK,那么下 
一个服务器再查询数据时就必须等待直到第一个进程更新完毕。并发执行情况下,难免会有问题, 
因此需要考虑容错机制。即用另一个Job定时监视(SSBSendResult IS NULL OR SSBSendResult='N') 
AND HasSendSSB='I'的数据,并将状态清空(SET HasSendSSB=NULL,SSBSendResult=NULL), 
等待程序下次再次处理。不过这种情况应该非常少。


1   UPDATE  dbo.BatchInventoryQueue
2   SET  HasSendSSB = NULL ,
3   SSBSendResult = NULL
4   WHERE  (SSBSendResult  IS   NULL   OR  SSBSendResult = ' N ' )
5   AND  HasSendSSB = ' I '

接着我们要生成SSBMessage,这里我使用VTemplate模版引擎来生成.代码如下: 
SSBMessageBase:


 1   public   abstract   class  SSBMessageBase
 2   {
 3        public   string  Subject {  get ;  set ; }
 4  
 5        public   string  FromService {  get ;  set ; }
 6  
 7        public   string  ToService {  get ;  set ; }
 8  
 9        public  SSBMessageHead Head {  get ;  set ; }
10   } 

SSBMessageHead:


1   public   class  SSBMessageHead
2   {
3        public   string  Action {  get ;  set ; }
4  
5        public   string  TransactionCode {  get ;  set ; }
6   } 

SSBMessageFromFile:


1   public   class  SSBMessageFromFile : SSBMessageBase
2   {
3        public   string  FileName {  get ;  set ; }
4   }

 VTemplate:


 1   < vt:template >
 2   < Publish >
 3   < Subject > /Subject>
 4   < FromService > </ FromService >
 5   < ToService > </ ToService >
 6   < Message >
 7        < Head >
 8            < Action > </ Action >
 9            < TransactionCode > </ TransactionCode >
10        </ Head >
11        < Body >
12        < vt:output  file ="$ssb.FileName"  charset ="utf-8"   />
13        </ Body >
14   </ Message >
15   </ Publish >
16   </ vt:template >

SSBUtility:


 1   public   class  SSBUtility
 2   {
 3        private   string  VtSSBMessage(SSBMessageBase ssb)
 4       {
 5            string  fileName  =  Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @" Templates\SSB.vt " );
 6          
 7           TemplateDocument doc  = new  TemplateDocument(fileName, Encoding.UTF8);
 8           doc.SetValue( " ssb " , ssb);
 9  
10           StringBuilder sb  =   new  StringBuilder();
11           StringWriter sw  =   new  StringWriter(sb);
12           doc.Render(sw);
13           sw.Close();
14  
15            return  sb.ToString();
16       }
17  
18        public   void  SendSSB(SSBMessageBase ssb)
19       {
20            string  msg  =  VtSSBMessage(ssb);
21            using  (SqlConnection conn  =   new  SqlConnection(JobConfigs.SSBConnectionString))
22           {
23                using  (SqlCommand cmd  =   new  SqlCommand())
24               {
25                   cmd.Connection  =  conn;
26                   cmd.CommandType  =  CommandType.StoredProcedure;
27                   cmd.CommandText  =  JobConfigs.SSBSendProc; // dbo.[UP_Send_Inventory]
28                   SqlParameter p  =   new  SqlParameter( " @Message " , SqlDbType.Xml);
29                   p.Value  =  msg;
30                   cmd.Parameters.Add(p);
31                   conn.Open();
32                   cmd.ExecuteNonQuery();
33                   conn.Close();
34               }
35           }
36       }
37   } 

再接着就是SSB创建部分,下面列举代码示例:


  1   USE [master]
  2   GO
  3  
  4   IF EXISTS(SELECT  *  FROM sys.databases  where  [name] = ' SSB ' )
  5       DROP DATABASE SSB
  6  
  7   CREATE DATABASE SSB
  8   GO
  9  
 10   -- enable service broker on database
 11   ALTER DATABASE SSB
 12   SET TRUSTWORTHY ON
 13   GO
 14  
 15   USE SSB
 16   GO
 17  
 18   -- create a test table
 19   CREATE TABLE dbo.Inventory
 20   (
 21   [ItemNumber] INT IDENTITY( 1 , 1 ) PRIMARY KEY,
 22   [Inventory] INT NOT NULL
 23   )
 24  
 25   INSERT INTO dbo.Inventory([Inventory])
 26   VALUES( 1 )
 27  
 28   GO
 29  
 30   -- create message type and contract
 31   CREATE MESSAGE TYPE [Message_Type_Inventory]
 32   VALIDATION  =  WELL_FORMED_XML;
 33  
 34   CREATE CONTRACT [Contract_Inventory]
 35   (
 36   [Message_Type_Inventory] SENT BY INITIATOR
 37   )
 38   GO
 39  
 40   -- create queue, send service and receive
 41   CREATE QUEUE [Queue_Inventory]
 42   WITH STATUS = ON,
 43   RETENTION = OFF;
 44  
 45   CREATE SERVICE [Service_Send_Inventory]
 46   ON QUEUE [Queue_Inventory]([Contract_Inventory])
 47   GO
 48  
 49   CREATE SERVICE [Service_Receive_Inventory]
 50   ON QUEUE [Queue_Inventory]([Contract_Inventory])
 51   GO
 52  
 53   -- receive procedure
 54   CREATE PROCEDURE dbo.[UP_Receive_Inventory]
 55   AS
 56   BEGIN
 57       SET NOCOUNT ON;
 58       DECLARE @ConversionHandle UNIQUEIDENTIFIER,
 59           @MessageType SYSNAME,
 60           @Message XML,
 61           @ItemNumber CHAR( 50 ),
 62           @Inventory INT
 63  
 64       WHILE( 1 = 1 )
 65       BEGIN
 66           WAITFOR(   
 67           RECEIVE TOP( 1 ) @ConversionHandle = conversation_handle,
 68                   @MessageType = message_type_name,
 69                   @Message = CAST(message_body AS XML)
 70               FROM [Queue_Inventory]
 71           ),TIMEOUT  1000
 72           IF(@@ROWCOUNT = 0 )
 73               BREAK;
 74  
 75           SELECT @ItemNumber = @Message.value( ' (/Publish/Message/Body/Inventory/ItemNumber/text())[1] ' , ' INT ' ),
 76               @Inventory = @Message.value( ' (/Publish/Message/Body/Inventory/Inventory/text())[1] ' , ' INT ' )
 77          
 78           UPDATE dbo.Inventory
 79           SET Inventory = @Inventory
 80           WHERE ItemNumber = @ItemNumber
 81       END
 82   END
 83   GO
 84  
 85   -- activate queue
 86   ALTER QUEUE [Queue_Inventory]
 87   WITH ACTIVATION
 88   (
 89   PROCEDURE_NAME = [UP_Receive_Inventory],
 90   MAX_QUEUE_READERS = 5 ,
 91   EXECUTE AS OWNER
 92   )
 93   GO
 94  
 95   -- send procedure
 96   CREATE PROCEDURE dbo.[UP_Send_Inventory]
 97   (
 98       @Message XML
 99   )
100   AS
101   BEGIN
102       SET NOCOUNT ON;
103       DECLARE @ConversationHandle UNIQUEIDENTIFIER
104      
105       BEGIN DIALOG CONVERSATION @ConversationHandle
106       FROM SERVICE [Service_Send_Inventory]
107       TO SERVICE  ' Service_Receive_Inventory '
108       ON CONTRACT [Contract_Inventory]
109       WITH ENCRYPTION  =  OFF;
110  
111       SEND ON CONVERSATION @ConversationHandle
112       MESSAGE TYPE [Message_Type_Inventory](@Message)
113      
114       END CONVERSATION @ConversationHandle WITH CLEANUP;
115   END
116   GO
117  
118   -- test data
119   DECLARE @Message XML
120   SELECT @Message = '
121   < Publish >
122   < Subject > BatchInventory </ Subject >
123   < FromService > Service_Send_Inventory </ FromService >
124   < ToService > Service_Receive_Inventory </ ToService >
125   < Message >
126        < Head >
127            < Action > UpdateInventory </ Action >
128            < TransactionCode > 123 </ TransactionCode >
129        </ Head >
130        < Body >
131            < Inventory >
132                < ItemNumber > 1 </ ItemNumber >
133                < Inventory > 200 </ Inventory >
134            </ Inventory >
135        </ Body >
136   </ Message >
137   </ Publish > '
138  
139   SELECT  *  FROM dbo.[Inventory]
140   -- send
141   EXEC dbo.[UP_Send_Inventory] @Message
142  
143   WAITFOR DELAY  ' 00:00:30 '
144   SELECT  *  FROM Inventory

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于SQL2008使用SSB与数据库异步通信的详细内容...

  阅读:54次