好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

对ServiceStack.Redis的连接池进行故障转移改造

对ServiceStack.Redis的连接池进行故障转移改造

对ServiceStack.Redis的连接池进行故障转移改造

使用ServiceStack.Redis的连接池在操作多台Redis的时候并不会对出现故障的Redis进行排除切换,这样就会导致应用会还是会分配到故障的Redis服务中导致应用处理错误.这次对ServiceStack.Redis连接池的改造主要实现两个功能:1)对 故障的Redis服务在轮循的时候排除,2)定期检测 故障的Redis服务,如果服务正常则恢复到轮盾环节中.( ServiceStack.Redis的代码结构还是很不错修改起来也很方便 )

增加基于Host的连接池功能

ServiceStack.Redis连接池的连接存储结构相对简单,只是用一些简单的数组进行处理也没有明确按Host划分,所以修改起来比较麻烦.通过查看代码决定在RedisEndPoint添加一系列的功能.包括:连接获取,回收,有效性验测等功能.详细代码如下:

     public   class   RedisEndPoint : EndPoint
    {
          public  RedisEndPoint( string  host,  int  port) :  base  (host, port)
        {
        }

          public  RedisEndPoint( string  host,  int  port,  string  password) :  this  (host,port)
        {
              this .Password =  password;
        }

          public   string  Password {  get ;  set  ; }

          public   bool  RequiresAuth {  get  {  return  ! string  .IsNullOrEmpty(Password); } }

          private  System.Collections.Generic.Stack<RedisClient> mStack =  new  System.Collections.Generic.Stack<RedisClient> ();

          private   bool  mAvailable =  true  ;

          private   int  mLastDetectTime =  0  ;

          private   void  PingHost( object   state)
        {
              try  
            {
                RedisClient client  =  Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
                client.Password  =  Password;
                client.EndPoint  =  this  ;
                client.Ping();
                Push(client);
                mAvailable  =  true  ;
                
            }
              catch  
            {

            }
            mIsDetecting  =  false  ;
        }

          private   bool  mIsDetecting =  false  ;

          public   bool   Detect()
        {
              if  (! mAvailable)
            {
                  if  (System.Math.Abs(System.Environment.TickCount - mLastDetectTime) >=  10000  )
                {
                    mLastDetectTime  =  System.Environment.TickCount;
                      if  (! mIsDetecting)
                    {
                        mIsDetecting  =  true  ;
                        System.Threading.ThreadPool.QueueUserWorkItem(PingHost);
                    }
                    
                   
                }
            }
              return   mAvailable;
        }

          public   RedisClient Pop()
        {
              lock   (mStack)
            {
                  if  (mStack.Count >  0  )
                     return   mStack.Pop();
                
            }
            RedisClient client  =  Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
            client.EndPoint  =  this  ;
            client.Password  =  Password;
              return   client;
        }

          public   void   Push(RedisClient client)
        {
              lock   (mStack)
            {
                  if  (! client.HadExceptions)
                {
                    mStack.Push(client);
                }
                  else  
                {
                    client.ClientManager  =  null  ;
                    client.Dispose();
                      while  (mStack.Count >  0  )
                    {
                        client  =  mStack.Pop();
                        client.ClientManager  =  null  ;
                        client.Dispose();
                    }
                    mAvailable  =  false  ;
                }
            }
        }
    } 

比较重要的功能主要是回收和检测,在连接回收的时候判断连接是否存在异常(从代码来看 HadExceptions的设置是由SocketError引发的,因此可以判断当这个值为True的时候存在网络异常 ),如果是则把当前节点标识为不可用,并把池中的所有连接进行清除关闭.检测方法主要是每隔10秒对redis服务进行一个连接和ping操作,如果成功该节点恢复到有效状态. 

修改PooledRedisClientManager 

 为了让新连接池的代码生效,必须修改PooledRedisClientManager几个地方,主要是连接获了和连接回收几个方法的代码.

GetInActiveWriteClient

 ///   <summary> 
         ///   Called within a lock
          ///   </summary> 
         ///   <returns></returns> 
         private   RedisClient GetInActiveWriteClient()
        {
              var  desiredIndex = WritePoolIndex %  writeClients.Length;
              //  this will loop through all hosts in readClients once even though there are 2 for loops
              //  both loops are used to try to get the prefered host according to the round robin algorithm 
             for  ( int  x =  0 ; x < ReadWriteHosts.Count; x++ )
            {
                  var  nextHostIndex = (desiredIndex + x) %  ReadWriteHosts.Count;
                  var  nextHost =  ReadWriteHosts[nextHostIndex];
                  if   (nextHost.Detect())
                {
                    RedisClient client  =  nextHost.Pop();
                      if  (client !=  null  )
                    {
                          if   (nextHost.RequiresAuth)
                            client.Password  =  nextHost.Password;
                        client.Id  = RedisClientCounter++ ;
                        client.ClientManager  =  this  ;
                        client.NamespacePrefix  =  NamespacePrefix;
                        client.ConnectionFilter  =  ConnectionFilter;
                          return   client;
                    }
                }
                  //  for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
                  //  {                    
                  //      if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
                  //          return writeClients[i];
                  //      else if (writeClients[i] == null || writeClients[i].HadExceptions)
                  //      {
                  //          if (writeClients[i] != null)
                  //              writeClients[i].DisposeConnection();
                  //          var client = RedisClientFactory.CreateRedisClient(nextHost.Host, nextHost.Port);

                  //          if (nextHost.RequiresAuth)
                  //              client.Password = nextHost.Password;

                  //          client.Id = RedisClientCounter++;
                  //          client.ClientManager = this;
                  //          client.NamespacePrefix = NamespacePrefix;
                  //          client.ConnectionFilter = ConnectionFilter;
                        
                  //          writeClients[i] = client;

                  //          return client;
                  //      }
                  //  } 
             }
              return   null  ;
        } 

把代码改成直接检测当明的Host是否有效,如果是则获取连接并返回,这里只修改的writerclient,类里面还有readclient的方法也相对应用进行修改.

 GetClient方法代码  

         ///   <summary> 
         ///   Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
          ///   </summary> 
         ///   <returns></returns> 
         public   IRedisClient GetClient()
        {
              lock   (writeClients)
            {
                AssertValidReadWritePool();

                RedisClient inActiveClient;
                inActiveClient  =  GetInActiveWriteClient();
                  if (inActiveClient ==  null  )
                      throw   new   TimeoutException(PoolTimeoutError);
                  //  while ((inActiveClient = GetInActiveWriteClient()) == null)
                  //  {
                  //      if (PoolTimeOut.HasValue)
                  //      {
                  //          //   wait for a connection, cry out if made to wait too long
                  //          if (!Monitor.Wait(writeClients, PoolTimeOut.Value))
                  //              throw new TimeoutException(PoolTimeoutError);
                  //      }
                  //      else
                  //          Monitor.Wait(writeClients);
                  //  } 
 
                WritePoolIndex ++ ;
                inActiveClient.Active  =  true  ;

                  if  ( this .ConnectTimeout !=  null  )
                {
                    inActiveClient.ConnectTimeout  =  this  .ConnectTimeout.Value;
                }

                  if (  this  .SocketSendTimeout.HasValue )
                {
                    inActiveClient.SendTimeout  =  this  .SocketSendTimeout.Value;
                }
                  if (  this  .SocketReceiveTimeout.HasValue )
                {
                    inActiveClient.ReceiveTimeout  =  this  .SocketReceiveTimeout.Value;
                }

                inActiveClient.NamespacePrefix  =  NamespacePrefix;

                  //  Reset database to default if changed 
                 if  (inActiveClient.Db !=  Db)
                {
                    inActiveClient.ChangeDb(Db);
                }

                  return   inActiveClient;
            }
        } 

DisposeClient方法代码

         public   void   DisposeClient(RedisNativeClient client)
        {
              if  (client.EndPoint !=  null  )
            {
                client.EndPoint.Push((RedisClient)client);
                  return  ;
            }
              //  lock (readClients)
              //  {
              //      for (var i = 0; i < readClients.Length; i++)
              //      {
              //          var readClient = readClients[i];
              //          if (client != readClient) continue;
              //          client.Active = false;
              //          Monitor.PulseAll(readClients);
              //          return;
              //      }
              //  } 

通过以上简单的代码修改后ServiceStack.Redis的连接池就具备了故意迁移和恢复的功能:)

可靠、高性能的Socket TCP通讯组件
开源数据库访问组件

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于对ServiceStack.Redis的连接池进行故障转移改造的详细内容...

  阅读:33次