Socket连接池
Socket连接池
“池”这个概念好像最早是在操作系统的课上听过的,到后来出来工作的第二天组长也跟我提起“池”这个东东。它给我的感觉是某种对象的集合,如果要用的话就取出,不用的话就放回。在学多线程的时候有接触过线程池,在写《 Socket 一对多通信 》的时候想到了Socket连接池这回事,不过在网上谷歌了一下,发现这类的文章貌似不多,看了一下园友的博文《 高性能Socket设计实现 》,获益良多,下了份源码来看,虽然有一部分看不明白,而且由于个人水平跑不了那份代码,但是从中我学到了不少,至少我写的“池”有一部分是用了这位田大哥的思想。
先来分析各个类之间的结构,整个连接池里面实际上是有两个池,一个是在异步通信中可以重复利用的SocketAsyncEventArgs池(当然处于别的方面考虑,池里面并不是单纯放SocketAsyncEventArgs的实例集合),另一个是接收数据时用到的byte[]缓冲池。而这两个池是外部不能访问的,外部通过一个控制(或者叫管理)的类进行操作。继承了前两篇博文中的异步通信的思想,最底的一个类存放了一次连接的信息,包括两个SocketAsyncEventArgs实例和通信另一端的Socket实例。下面将逐部分介绍。
1 /// <summary>
2 /// 连接单元
3 /// </summary>
4 class ConnectionUnit:IDisposable
5 {
6 private string _uid; // 单元的编号,默认为-1
7 private bool _state; // 单元的状态,true表示使用,false表示空闲
8 private SocketAsyncEventArgs _sendArg; // 专用于发送
9 private SocketAsyncEventArgs _recArg; // 专用于接收
10 internal Socket client { get ; set ; } // 客户端的socket实例
11 internal ArrayList tempArray { get ; set ; } // 暂存已接收的数据,避免粘包用的
12
13
14 public ConnectionUnit( string UID)
15 {
16 _uid = UID;
17 tempArray = new ArrayList();
18 }
19
20 public ConnectionUnit() : this ( " -1 " ) { }
21
22 public void Dispose()
23 {
24 if (_sendArg != null )
25 _sendArg.Dispose();
26 if (_recArg != null )
27 _recArg.Dispose();
28
29 _sendArg = null ;
30 _recArg = null ;
31 }
32 }
这个与之前一篇讲异步通信的博文一样,一个Socket两个SocketAsyncEventArgs。为了取客户端的Socket方便一点,两个SocketAsyncEventArgs一个用于收,一个用于发,田大哥说这样可以实现双工。的确是这样,当初把类定成这样也是为了在收的同时也能发。以上代码删掉了把字段装成属性的那部分。
SocketAsyncEventArgsPool这个类就是上面链接单元的一个池,这个池才是整个连接池里最核心的池,也是真正意义上的池。池里面用了两个集合来存放这一堆连接单元。分别是空闲栈(freeCollecton)和在线字典集(busyCollection)。对于这样的一个池,我就认为,只要在我需要用的时候把对象取出来,顶多在去的时候给一些参数,让池帮忙把这个对象配置好,我就拿来使用,等到我用完了,把他放回池里,池就把对象还原,等待下次再使用。正体感觉这个池对外有点像个栈,取的时候就Pop,放的时候就Push,此外还提供了两个与栈不相干的的方法,根据编号获取在线的连接单元和获取所有在线连接单元的编号。
1 class SocketAsyncEventArgsPool:IDisposable
2 {
3 private Dictionary< string , ConnectionUnit> busyCollection;
4 private Stack<ConnectionUnit> freeCollecton;
5
6 internal SocketAsyncEventArgsPool()
7 {
8 busyCollection = new Dictionary< string , ConnectionUnit> ();
9 freeCollecton = new Stack<ConnectionUnit> ();
10 }
11
12 /// <summary>
13 /// 取出
14 /// </summary>
15 internal ConnectionUnit Pop( string uid)
16 {
17 ConnectionUnit unit = freeCollecton.Pop();
18 unit.State = true ;
19 unit.Uid = uid;
20 busyCollection.Add(uid, unit);
21 return unit;
22 }
23
24 /// <summary>
25 /// 放回
26 /// </summary>
27 internal void Push(ConnectionUnit unit)
28 {
29 if (! string .IsNullOrEmpty(unit.Uid) && unit.Uid != " -1 " )
30 busyCollection.Remove(unit.Uid);
31 unit.Uid = " -1 " ;
32 unit.State = false ;
33 freeCollecton.Push(unit);
34 }
35
36 /// <summary>
37 /// 获取
38 /// </summary>
39 internal ConnectionUnit GetConnectionUnitByUID( string uid)
40 {
41 if (busyCollection.ContainsKey(uid))
42 return busyCollection[uid];
43 return null ;
44 }
45
46 /// <summary>
47 ///
48 /// </summary>
49 internal string [] GetOnLineList()
50 {
51 return busyCollection.Keys.ToArray();
52 }
53
54 public void Dispose()
55 {
56 foreach (KeyValuePair< string ,ConnectionUnit> item in busyCollection)
57 item.Value.Dispose();
58
59 busyCollection.Clear();
60
61 while (freeCollecton.Count > 0 )
62 freeCollecton.Pop().Dispose();
63 }
BufferManager这个是专给接收的SocketAsyncEventArgs用的缓冲池,整一个连接池里面所有接收用的缓冲区都用这个BufferManager,参照田大哥的思想,现在内存里开辟一大片区域存放byte,然后给每一个接收用的SocketAsyncEventArgs分配一块。
1 class BufferManager:IDisposable
2 {
3 private byte [] buffers; // 缓冲池
4 private int bufferSize; // 每个单元使用的大小
5 private int allSize; // 池的总大小
6 private int currentIndex; // 当前可用的索引
7 private Stack< int > freeIndexs; // 已使用过的空闲索引
8
9 /// <summary>
10 /// 构造缓存池
11 /// </summary>
12 /// <param name="buffersSize"> 池总大小 </param>
13 /// <param name="defaultSize"> 默认单元大小 </param>
14 internal BufferManager( int buffersSize, int defaultSize)
15 {
16 this .bufferSize= defaultSize;
17 this .allSize= buffersSize;
18 currentIndex= 0 ;
19 this .buffers = new byte [allSize];
20 freeIndexs = new Stack< int > ();
21 }
22
23 /// <summary>
24 /// 给SocketAsyncEventArgs设置缓冲区
25 /// </summary>
26 internal bool SetBuffer(SocketAsyncEventArgs e)
27 {
28 // 首先看看空闲栈里有没有空闲的区域,有就使用
29 if (freeIndexs.Count > 0 )
30 {
31 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
32 }
33 else
34 {
35 // 没有就得从buffers里取,如果buffers用光了当然取不了
36 if ((allSize - currentIndex) < bufferSize) return false ;
37 e.SetBuffer(buffers, currentIndex, bufferSize);
38 currentIndex += bufferSize;
39 }
40 return true ;
41 }
42
43 /// <summary>
44 /// 释放SocketAsyncEventArgs的缓冲区
45 /// </summary>
46 /// <param name="e"></param>
47 internal void FreeBuffer(SocketAsyncEventArgs e)
48 {
49 // 把索引放到空闲索引栈里面,供下次取的时候重复利用
50 freeIndexs.Push(e.Offset);
51 // 同时清空这部分区域的数据免得上次使用时遗留的数据会掺
52 // 和到下次读取的数据中
53 for ( int i = e.Offset; i < e.Offset + bufferSize; i++ )
54 {
55 if (buffers[i] == 0 ) break ;
56 buffers[i] = 0 ;
57 }
58 e.SetBuffer( null , 0 , 0 );
59 }
60
61 public void Dispose()
62 {
63 buffers = null ;
64 freeIndexs = null ;
65 }
66 }
其实上面两个池都是很大程度参照了《 高性能Socket设计实现 》中的内容。下面这个SocketPoolController是对外的类,这个类的设计参照的就没那么多了。而对于一个Socket通信(服务端的)来说,无非都是三件事,接受连接,接收数据,发送数据。这三件事我再操作类里面是这样做的
接受连接:接受再运行池的时候就开始了,异步循环地接受,执行一次异步接受就阻塞,等到接受完成才被唤醒。
1 /// <summary>
2 /// 异步Accept客户端的连接
3 /// </summary>
4 void MyAsyncAccept()
5 {
6 // 这里使用Action的方式异步循环接受客户端的连接
7 // 模仿同事的做法没开线程,不知这种方式是好是坏
8 Action callback = new Action( delegate ()
9 {
10 while ( true )
11 {
12 // 每次接受都要新开一个SocketAsyncEventArgs,否则会报错
13 // 其实我也想重复利用的
14 SocketAsyncEventArgs e = new SocketAsyncEventArgs();
15 e.Completed += new EventHandler<SocketAsyncEventArgs> (Accept_Completed);
16
17 acceptLock.Reset();
18 server.AcceptAsync(e);
19 // 在异步接受完成之前阻塞当前线程
20 acceptLock.WaitOne();
21 }
22 });
23 callback.BeginInvoke( null , null );
24 }
25
26 void Accept_Completed( object sender, SocketAsyncEventArgs e)
27 {
28 Socket client = e.AcceptSocket;
29 try
30 {
31 if (client.Connected)
32 {
33 IPEndPoint point = client.RemoteEndPoint as IPEndPoint;
34 string uid = point.Address + " : " + point.Port;
35 ConnectionUnit unit = pool.Pop(uid);
36 unit.client = client;
37 unit.State = true ;
38 unit.Uid = uid;
39 unit.RecArg.UserToken = unit;
40 unit.SendArg.UserToken = unit;
41 buffer.SetBuffer(unit.RecArg);
42
43 // 在接受成功之后就开始接收数据了
44 client.ReceiveAsync(unit.RecArg);
45 // 设置并发限制信号和增加当前连接数
46 semaphoreAccept.WaitOne();
47 Interlocked.Increment( ref currentConnect);
48
49 if (OnAccept != null ) OnAccept(uid);
50 }
51 else if (client != null )
52 {
53 client.Close();
54 client.Dispose();
55 client = null ;
56 }
57 }
58 catch (Exception ex) { Console.WriteLine(ex.ToString()); }
59 // 设置Accept信号,以便下次Accept的执行
60 acceptLock.Set();
61 e.Dispose();
62 }
接收消息:在异步接受成功的时候开始接收,每次接收完成之后就进行下一次接收,直到客户端断开连接才终止。
1 void RecArg_Completed( object sender, SocketAsyncEventArgs e)
2 {
3 Socket client = sender as Socket;
4 ConnectionUnit unit = e.UserToken as ConnectionUnit;
5 // 这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异
6 if (e.SocketError == SocketError.Success)
7 {
8 int rec = e.BytesTransferred;
9 if (rec == 0 )
10 {
11 CloseSocket(unit);
12 return ;
13 }
14 if (client.Available > 0 )
15 {
16 unit.tempArray.AddRange(e.Buffer);
17 buffer.FreeBuffer(unit.RecArg);
18 buffer.SetBuffer(unit.RecArg);
19 client.SendAsync(unit.RecArg);
20 return ;
21 }
22 byte [] data = e.Buffer;
23 int len = rec;
24 if (unit.tempArray.Count != 0 )
25 {
26 foreach ( byte item in data)
27 {
28 if (item == 0 ) break ;
29 unit.tempArray.Add(item);
30 }
31 data = unit.tempArray.ToArray( typeof ( byte )) as byte [];
32 rec = data.Length;
33 unit.tempArray.Clear();
34 }
35
36 string dataStr = Encoding.ASCII.GetString(data, 0 , len);
37 if (OnReceive != null )
38 OnReceive(unit.Uid, dataStr);
39
40 if (!unit.State) return ;
41 buffer.FreeBuffer(e);
42 buffer.SetBuffer(e);
43 client.ReceiveAsync(e);
44 }
45 // 这里还多个了一个关闭当前连接
46 else
47 {
48 CloseSocket(unit);
49 }
50 }
51 /// <summary>
52 /// 关闭一个连接单元
53 /// </summary>
54 private void CloseSocket( ConnectionUnit unit )
55 {
56 // 关闭并释放客户端socket的字眼
57 if (unit.client != null )
58 {
59 unit.client.Shutdown(SocketShutdown.Both);
60 unit.client.Dispose();
61 unit.client = null ;
62 }
63 // 把连接放回连接池
64 pool.Push(unit);
65 // 释放并发信号
66 semaphoreAccept.Release();
67 // 减少当前连接数
68 Interlocked.Decrement( ref currentConnect);
69 }
发送消息:外放方法,在需要的时候自行调用方法发送。
1 /// <summary>
2 /// 发送消息
3 /// </summary>
4 /// <param name="uid"></param>
5 /// <param name="message"></param>
6 public void SendMessage( string uid, string message)
7 {
8 sendLock.Reset();
9 ConnectionUnit unit= pool.GetConnectionUnitByUID(uid);
10 // 如果获取不了连接单元就不发送了,
11 if (unit == null )
12 {
13 if (OnSend!= null ) OnSend(uid, " 100 " );
14 sendLock.Set();
15 return ;
16 }
17 byte [] datas = Encoding.ASCII.GetBytes(message);
18 unit.SendArg.SetBuffer(datas, 0 , datas.Length);
19 unit.client.SendAsync(unit.SendArg);
20 // 阻塞当前线程,等到发送完成才释放
21 sendLock.WaitOne();
22 }
23
24 void SendArg_Completed( object sender, SocketAsyncEventArgs e)
25 {
26 Socket client = sender as Socket;
27 ConnectionUnit unit = e.UserToken as ConnectionUnit;
28 // 这里的消息码有三个,2字头的是成功的,1字头是不成功的
29 // 101是未知错误,100是客户端不在线
30 if (e.SocketError == SocketError.Success)
31 if (OnSend != null ) OnSend(unit.Uid, " 200 " );
32 else if (OnSend != null ) OnSend(unit.Uid, " 101 " );
33 // 释放信号,以便下次发送消息执行
34 sendLock.Set();
35 }
下面则是类里面的一些字段信息和构造函数
1 /// <summary>
2 /// 初始化池的互斥体
3 /// </summary>
4 private Mutex mutex = new Mutex();
5
6 /// <summary>
7 /// Accept限制信号
8 /// </summary>
9 private Semaphore semaphoreAccept;
10
11 /// <summary>
12 /// Accept信号
13 /// </summary>
14 private static ManualResetEvent acceptLock = new ManualResetEvent( false );
15
16 /// <summary>
17 /// Send信号
18 /// </summary>
19 private static ManualResetEvent sendLock = new ManualResetEvent( false );
20
21 /// <summary>
22 /// 最大并发数(连接数)
23 /// </summary>
24 private int maxConnect;
25
26 /// <summary>
27 /// 当前连接数(并发数)
28 /// </summary>
29 private int currentConnect;
30
31 /// <summary>
32 /// 缓冲池
33 /// </summary>
34 private BufferManager buffer;
35
36 /// <summary>
37 /// SocketasyncEventArgs池
38 /// </summary>
39 private SocketAsyncEventArgsPool pool;
40
41 /// <summary>
42 /// 服务端Socket
43 /// </summary>
44 private Socket server;
45
46 /// <summary>
47 /// 完成接受的委托
48 /// </summary>
49 public delegate void AcceptHandler( string uid);
50
51 /// <summary>
52 /// 完成发送的委托
53 /// </summary>
54 public delegate void SendHandler( string uid, string result);
55
56 /// <summary>
57 /// 完成接收的委托
58 /// </summary>
59 public delegate void RecevieHandler( string uid, string data);
60
61 /// <summary>
62 /// 完成接受事件
63 /// </summary>
64 public event AcceptHandler OnAccept;
65
66 /// <summary>
67 /// 完成发送事件
68 /// </summary>
69 public event SendHandler OnSend;
70
71 /// <summary>
72 /// 完成接收事件
73 /// </summary>
74 public event RecevieHandler OnReceive;
75
76 /// <summary>
77 /// 构造函数
78 /// </summary>
79 /// <param name="buffersize"> 单元缓冲区大小 </param>
80 /// <param name="maxCount"> 并发总数 </param>
81 public SocketPoolController( int buffersize, int maxCount)
82 {
83 buffer = new BufferManager(buffersize * maxCount,buffersize);
84 this .currentConnect = 0 ;
85 this .maxConnect = maxCount;
86 this .currentConnect = 0 ;
87 this .pool = new SocketAsyncEventArgsPool();
88 // 设置并发数信号,经试验过是并发数-1才对
89 this .semaphoreAccept = new Semaphore(maxCount- 1 , maxCount- 1 );
90 InitPool();
91 }
构造函数里用到的方法
1 /// <summary>
2 /// 初始化SocketAsyncEventArgs池
3 /// 这里主要是给空闲栈填充足够的实例
4 /// </summary>
5 private void InitPool()
6 {
7 ConnectionUnit unit = null ;
8 for ( int i = 0 ; i < maxConnect; i++ )
9 {
10 unit = new ConnectionUnit();
11 unit.Uid = " -1 " ;
12 unit.RecArg = new SocketAsyncEventArgs();
13 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs> (RecArg_Completed);
14 unit.SendArg = new SocketAsyncEventArgs();
15 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs> (SendArg_Completed);
16 this .pool.Push(unit);
17 }
18 }
其他外放专门控制池的方法
1 /// <summary>
2 /// 启动池
3 /// </summary>
4 /// <param name="ipAddress"> 服务端的IP </param>
5 /// <param name="port"> 端口 </param>
6 public void RunPool( string ipAddress, int port)
7 {
8 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
9 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
10 server.Bind(endpoint);
11 server.Listen( 100 );
12
13 // 调用方法异步Accept客户端的连接
14 MyAsyncAccept();
15 // 设置信号,防止再池在已经启动的情况下再次启动
16 mutex.WaitOne();
17 }
18
19 /// <summary>
20 /// 停止池
21 /// </summary>
22 public void StopPool()
23 {
24 // 把服务端的socket关了
25 if (server != null )
26 server.Close();
27 // 释放互斥信号,等待下次启动
28 mutex.ReleaseMutex();
29 // 释放资源
30 Dispose();
31 }
要把这个操作类与现实的事物类比的话,这个与我们平常跟某个或一堆人聊天时差不多,别人说的东西,不用你自己控制你都会听得到(排除带上耳机,塞住耳朵这种极端的情况),所以接收消息那个方法就不需要了,而你要说什么这个要靠个人控制,而事件那些也好类比,OnAccept就相当于某人加入到这次聊天中你会做些什么(say hello 或者无视),OnRecevie就听到别人说什么自己有什么反应,OnSend就自己说完话之后有什么反应(有时候发现说错了会纠正,有时觉得自己说得好笑的也笑一场诸如此类)。
在使用的时候可以这样子
1 SocketPoolController pool; 2 pool = new SocketPoolController( 32 * 1024 , 1000 ); 3 pool.OnReceive += new SocketPoolController.RecevieHandler(pool_OnReceive); 4 pool.OnSend += new SocketPoolController.SendHandler(pool_OnSend); 5 pool.OnAccept += new SocketPoolController.AcceptHandler(pool_OnAccept); 6 pool.RunPool( " 127.0.0.1 " , 8081 ); 7 Console.WriteLine( " Pool has run\r\npress any key to stop... " ); 8 Console.ReadKey(); 9 pool.StopPool(); 10 Console.WriteLine( " Pool has stop\r\npress any key to exit... " ); 11 Console.ReadLine();
在池里面有一部分地方看似跟用同步的差不多,像接受客户端的连接,发送消息这些地方。可是用这种异步,万一客户端突然断开连接也不会有同步那样马上抛异常。还有一点的是在这份代码里面缺少了对异常的捕捉,有一部分错误我在测试的过程中设了判断避开了。以前跟某只猫提过Socket编程会与多线程一起使用,我也觉得是这样,在之前一篇博文 《Socket一对多通信》里我也用到线程,后来的异步通信也是有线程的,不过不是.net framework自行创建的。看了田大哥的博文给我的另一个收获是信号量的使用,在以前不懂得使用信号量,只会设置一大堆标识状态的布尔值或整形的变量来计数判断。田大哥的博文介绍的时高性能的Socket,而我的这个应该性能不会高到哪里去。上面有什么说错的请各位指出,有什么说漏的,请各位提点,多多指导。谢谢!
整份源码
1 /// <summary>
2 /// 连接单元
3 /// </summary>
4 class ConnectionUnit:IDisposable
5 {
6 private string _uid; // 单元的编号,默认为-1
7 private bool _state; // 单元的状态,true表示使用,false表示空闲
8 private SocketAsyncEventArgs _sendArg; // 专用于发送
9 private SocketAsyncEventArgs _recArg; // 专用于接收
10 internal Socket client { get ; set ; } // 客户端的socket实例
11 internal ArrayList tempArray { get ; set ; } // 暂存已接收的数据,避免粘包用的
12
13 public string Uid
14 {
15 get { return _uid; }
16 set { _uid = value; }
17 }
18
19 public ConnectionUnit( string UID)
20 {
21 _uid = UID;
22 tempArray = new ArrayList();
23 }
24
25 public ConnectionUnit() : this ( " -1 " ) { }
26
27 public bool State
28 {
29 get { return _state; }
30 set { _state = value; }
31 }
32
33 public SocketAsyncEventArgs SendArg
34 {
35 get { return _sendArg; }
36 set { _sendArg = value; }
37 }
38
39 public SocketAsyncEventArgs RecArg
40 {
41 get { return _recArg; }
42 set { _recArg = value; }
43 }
44
45 public void Dispose()
46 {
47 if (_sendArg != null )
48 _sendArg.Dispose();
49 if (_recArg != null )
50 _recArg.Dispose();
51
52 _sendArg = null ;
53 _recArg = null ;
54 }
55 }
56
57 class BufferManager:IDisposable
58 {
59 private byte [] buffers;
60 private int bufferSize;
61 private int allSize;
62 private int currentIndex;
63 private Stack< int > freeIndexs;
64
65 /// <summary>
66 /// 构造缓存池
67 /// </summary>
68 /// <param name="buffersSize"> 池总大小 </param>
69 /// <param name="defaultSize"> 默认单元大小 </param>
70 internal BufferManager( int buffersSize, int defaultSize)
71 {
72 this .bufferSize= defaultSize;
73 this .allSize= buffersSize;
74 currentIndex= 0 ;
75 this .buffers = new byte [allSize];
76 freeIndexs = new Stack< int > ();
77 }
78
79 /// <summary>
80 ///
81 /// </summary>
82 /// <param name="e"></param>
83 /// <param name="offSet"></param>
84 /// <returns></returns>
85 internal bool SetBuffer(SocketAsyncEventArgs e)
86 {
87 if (freeIndexs.Count > 0 )
88 {
89 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
90 }
91 else
92 {
93 if ((allSize - currentIndex) < bufferSize) return false ;
94 e.SetBuffer(buffers, currentIndex, bufferSize);
95 currentIndex += bufferSize;
96 }
97 return true ;
98 }
99
100 /// <summary>
101 ///
102 /// </summary>
103 /// <param name="e"></param>
104 internal void FreeBuffer(SocketAsyncEventArgs e)
105 {
106 freeIndexs.Push(e.Offset);
107 for ( int i = e.Offset; i < e.Offset + bufferSize; i++ )
108 {
109 if (buffers[i] == 0 ) break ;
110 buffers[i] = 0 ;
111 }
112 e.SetBuffer( null , 0 , 0 );
113 }
114
115 public void Dispose()
116 {
117 buffers = null ;
118 freeIndexs = null ;
119 }
120 }
121
122 class SocketAsyncEventArgsPool:IDisposable
123 {
124 private Dictionary< string , ConnectionUnit> busyCollection;
125 private Stack<ConnectionUnit> freeCollecton;
126
127 internal SocketAsyncEventArgsPool()
128 {
129 busyCollection = new Dictionary< string , ConnectionUnit> ();
130 freeCollecton = new Stack<ConnectionUnit> ();
131 }
132
133 /// <summary>
134 /// 取出
135 /// </summary>
136 internal ConnectionUnit Pop( string uid)
137 {
138 ConnectionUnit unit = freeCollecton.Pop();
139 unit.State = true ;
140 unit.Uid = uid;
141 busyCollection.Add(uid, unit);
142 return unit;
143 }
144
145 /// <summary>
146 /// 放回
147 /// </summary>
148 internal void Push(ConnectionUnit unit)
149 {
150 if (! string .IsNullOrEmpty(unit.Uid) && unit.Uid != " -1 " )
151 busyCollection.Remove(unit.Uid);
152 unit.Uid = " -1 " ;
153 unit.State = false ;
154 freeCollecton.Push(unit);
155 }
156
157 /// <summary>
158 /// 获取
159 /// </summary>
160 internal ConnectionUnit GetConnectionUnitByUID( string uid)
161 {
162 if (busyCollection.ContainsKey(uid))
163 return busyCollection[uid];
164 return null ;
165 }
166
167 /// <summary>
168 ///
169 /// </summary>
170 internal string [] GetOnLineList()
171 {
172 return busyCollection.Keys.ToArray();
173 }
174
175 public void Dispose()
176 {
177 foreach (KeyValuePair< string ,ConnectionUnit> item in busyCollection)
178 item.Value.Dispose();
179
180 busyCollection.Clear();
181
182 while (freeCollecton.Count > 0 )
183 freeCollecton.Pop().Dispose();
184 }
185 }
186
187 public class SocketPoolController:IDisposable
188 {
189 /// <summary>
190 /// 初始化池的互斥体
191 /// </summary>
192 private Mutex mutex = new Mutex();
193
194 /// <summary>
195 /// Accept限制信号
196 /// </summary>
197 private Semaphore semaphoreAccept;
198
199 /// <summary>
200 /// Accept信号
201 /// </summary>
202 private static ManualResetEvent acceptLock = new ManualResetEvent( false );
203
204 /// <summary>
205 /// Send信号
206 /// </summary>
207 private static ManualResetEvent sendLock = new ManualResetEvent( false );
208
209 /// <summary>
210 /// 最大并发数(连接数)
211 /// </summary>
212 private int maxConnect;
213
214 /// <summary>
215 /// 当前连接数(并发数)
216 /// </summary>
217 private int currentConnect;
218
219 /// <summary>
220 /// 缓冲池
221 /// </summary>
222 private BufferManager buffer;
223
224 /// <summary>
225 /// SocketasyncEventArgs池
226 /// </summary>
227 private SocketAsyncEventArgsPool pool;
228
229 /// <summary>
230 /// 服务端Socket
231 /// </summary>
232 private Socket server;
233
234 /// <summary>
235 /// 完成接受的委托
236 /// </summary>
237 public delegate void AcceptHandler( string uid);
238
239 /// <summary>
240 /// 完成发送的委托
241 /// </summary>
242 public delegate void SendHandler( string uid, string result);
243
244 /// <summary>
245 /// 完成接收的委托
246 /// </summary>
247 public delegate void RecevieHandler( string uid, string data);
248
249 /// <summary>
250 /// 完成接受事件
251 /// </summary>
252 public event AcceptHandler OnAccept;
253
254 /// <summary>
255 /// 完成发送事件
256 /// </summary>
257 public event SendHandler OnSend;
258
259 /// <summary>
260 /// 完成接收事件
261 /// </summary>
262 public event RecevieHandler OnReceive;
263
264 /// <summary>
265 /// 构造函数
266 /// </summary>
267 /// <param name="buffersize"> 单元缓冲区大小 </param>
268 /// <param name="maxCount"> 并发总数 </param>
269 public SocketPoolController( int buffersize, int maxCount)
270 {
271 buffer = new BufferManager(buffersize * maxCount,buffersize);
272 this .currentConnect = 0 ;
273 this .maxConnect = maxCount;
274 this .currentConnect = 0 ;
275 this .pool = new SocketAsyncEventArgsPool();
276 // 设置并发数信号,经试验过是并发数-1才对
277 this .semaphoreAccept = new Semaphore(maxCount- 1 , maxCount- 1 );
278 InitPool();
279 }
280
281 /// <summary>
282 /// 初始化SocketAsyncEventArgs池
283 /// 这里主要是给空闲栈填充足够的实例
284 /// </summary>
285 private void InitPool()
286 {
287 ConnectionUnit unit = null ;
288 for ( int i = 0 ; i < maxConnect; i++ )
289 {
290 unit = new ConnectionUnit();
291 unit.Uid = " -1 " ;
292 unit.RecArg = new SocketAsyncEventArgs();
293 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs> (RecArg_Completed);
294 unit.SendArg = new SocketAsyncEventArgs();
295 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs> (SendArg_Completed);
296 this .pool.Push(unit);
297 }
298 }
299
300 /// <summary>
301 /// 启动池
302 /// </summary>
303 /// <param name="ipAddress"> 服务端的IP </param>
304 /// <param name="port"> 端口 </param>
305 public void RunPool( string ipAddress, int port)
306 {
307 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
308 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
309 server.Bind(endpoint);
310 server.Listen( 100 );
311
312 // 调用方法异步Accept客户端的连接
313 MyAsyncAccept();
314 // 设置信号,防止再池在已经启动的情况下再次启动
315 mutex.WaitOne();
316 }
317
318 /// <summary>
319 /// 异步Accept客户端的连接
320 /// </summary>
321 void MyAsyncAccept()
322 {
323 // 这里使用Action的方式异步循环接受客户端的连接
324 // 模仿同事的做法没开线程,不知这种方式是好是坏
325 Action callback = new Action( delegate ()
326 {
327 while ( true )
328 {
329 // 每次接受都要新开一个SocketAsyncEventArgs,否则会报错
330 // 其实我也想重复利用的
331 SocketAsyncEventArgs e = new SocketAsyncEventArgs();
332 e.Completed += new EventHandler<SocketAsyncEventArgs> (Accept_Completed);
333
334 acceptLock.Reset();
335 server.AcceptAsync(e);
336 // 在异步接受完成之前阻塞当前线程
337 acceptLock.WaitOne();
338 }
339 });
340 callback.BeginInvoke( null , null );
341 }
342
343
344 /// <summary>
345 /// 停止池
346 /// </summary>
347 public void StopPool()
348 {
349 // 把服务端的socket关了
350 if (server != null )
351 server.Close();
352 // 释放互斥信号,等待下次启动
353 mutex.ReleaseMutex();
354 // 释放资源
355 Dispose();
356 }
357
358 /// <summary>
359 /// 发送消息
360 /// </summary>
361 /// <param name="uid"></param>
362 /// <param name="message"></param>
363 public void SendMessage( string uid, string message)
364 {
365 sendLock.Reset();
366 ConnectionUnit unit= pool.GetConnectionUnitByUID(uid);
367 // 如果获取不了连接单元就不发送了,
368 if (unit == null )
369 {
370 if (OnSend!= null ) OnSend(uid, " 100 " );
371 sendLock.Set();
372 return ;
373 }
374 byte [] datas = Encoding.ASCII.GetBytes(message);
375 unit.SendArg.SetBuffer(datas, 0 , datas.Length);
376 unit.client.SendAsync(unit.SendArg);
377 // 阻塞当前线程,等到发送完成才释放
378 sendLock.WaitOne();
379 }
380
381 void SendArg_Completed( object sender, SocketAsyncEventArgs e)
382 {
383 Socket client = sender as Socket;
384 ConnectionUnit unit = e.UserToken as ConnectionUnit;
385 // 这里的消息码有三个,2字头的是成功的,1字头是不成功的
386 // 101是未知错误,100是客户端不在线
387 if (e.SocketError == SocketError.Success)
388 if (OnSend != null ) OnSend(unit.Uid, " 200 " );
389 else if (OnSend != null ) OnSend(unit.Uid, " 101 " );
390 // 释放信号,以便下次发送消息执行
391 sendLock.Set();
392 }
393
394 void RecArg_Completed( object sender, SocketAsyncEventArgs e)
395 {
396 Socket client = sender as Socket;
397 ConnectionUnit unit = e.UserToken as ConnectionUnit;
398 // 这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异
399 if (e.SocketError == SocketError.Success)
400 {
401 int rec = e.BytesTransferred;
402 if (rec == 0 )
403 {
404 CloseSocket(unit);
405 return ;
406 }
407 if (client.Available > 0 )
408 {
409 unit.tempArray.AddRange(e.Buffer);
410 buffer.FreeBuffer(unit.RecArg);
411 buffer.SetBuffer(unit.RecArg);
412 client.SendAsync(unit.RecArg);
413 return ;
414 }
415 byte [] data = e.Buffer;
416 int len = rec;
417 if (unit.tempArray.Count != 0 )
418 {
419 foreach ( byte item in data)
420 {
421 if (item == 0 ) break ;
422 unit.tempArray.Add(item);
423 }
424 data = unit.tempArray.ToArray( typeof ( byte )) as byte [];
425 rec = data.Length;
426 unit.tempArray.Clear();
427 }
428
429 string dataStr = Encoding.ASCII.GetString(data, 0 , len);
430 if (OnReceive != null )
431 OnReceive(unit.Uid, dataStr);
432
433 if (!unit.State) return ;
434 buffer.FreeBuffer(e);
435 buffer.SetBuffer(e);
436 client.ReceiveAsync(e);
437 }
438 // 这里还多个了一个关闭当前连接
439 else
440 {
441 CloseSocket(unit);
442 }
443 }
444
445 void Accept_Completed( object sender, SocketAsyncEventArgs e)
446 {
447 Socket client = e.AcceptSocket;
448 try
449 {
450 if (client.Connected)
451 {
452 IPEndPoint point = client.RemoteEndPoint as IPEndPoint;
453 string uid = point.Address + " : " + point.Port;
454 ConnectionUnit unit = pool.Pop(uid);
455 unit.client = client;
456 unit.State = true ;
457 unit.Uid = uid;
458 unit.RecArg.UserToken = unit;
459 unit.SendArg.UserToken = unit;
460 buffer.SetBuffer(unit.RecArg);
461
462 // 在接受成功之后就开始接收数据了
463 client.ReceiveAsync(unit.RecArg);
464 // 设置并发限制信号和增加当前连接数
465 semaphoreAccept.WaitOne();
466 Interlocked.Increment( ref currentConnect);
467
468 if (OnAccept != null ) OnAccept(uid);
469 }
470 else if (client != null )
471 {
472 client.Close();
473 client.Dispose();
474 client = null ;
475 }
476 }
477 catch (Exception ex) { Console.WriteLine(ex.ToString()); }
478 // 设置Accept信号,以便下次Accept的执行
479 acceptLock.Set();
480 e.Dispose();
481 }
482
483 /// <summary>
484 /// 关闭一个连接单元
485 /// </summary>
486 private void CloseSocket( ConnectionUnit unit )
487 {
488 // 关闭并释放客户端socket的字眼
489 if (unit.client != null )
490 {
491 unit.client.Shutdown(SocketShutdown.Both);
492 unit.client.Dispose();
493 unit.client = null ;
494 }
495 // Console.WriteLine(unit.Uid+" disconnect ");
496 // 把连接放回连接池
497 pool.Push(unit);
498 // 释放并发信号
499 semaphoreAccept.Release();
500 // 减少当前连接数
501 Interlocked.Decrement( ref currentConnect);
502 }
503
504 public void Dispose()
505 {
506 if (pool != null )
507 {
508 pool.Dispose();
509 pool = null ;
510 }
511 if (buffer != null )
512 {
513 buffer.Dispose();
514 buffer = null ;
515 }
516 if (server != null )
517 {
518 server.Dispose();
519 server = null ;
520 }
521
522 }
523 }
分类: C# , Socket编程
作者: Leo_wl
出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息