好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

C#中一个高性能异步socket封装库的实现思路分享

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(iocp)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

 

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多[坑[。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

 

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

 

为了使大家对通讯效率有初步了解,先看测试图。

主机配置情况

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

 

库的结构图

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1k的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

netlistener 监听

?

using system;

using system.net;

using system.net.sockets;

using system.threading;

 

namespace iocpcore

{

  class netlistener

  {

   private socket listensocket;

   public listenparam _listenparam { get; set; }

   public event action< listenparam , asyncsocketclient> onacceptsocket;

 

   bool start;

 

   netserver _netserver;

   public netlistener(netserver netserver)

   {

    _netserver = netserver;

   }

 

   public int _acceptasynccount = 0;

   public bool startlisten()

   {

    try

    {

     start = true;

     ipendpoint listenpoint = new ipendpoint(ipaddress.parse("0.0.0.0"), _listenparam._port);

     listensocket = new socket(listenpoint.addressfamily, sockettype.stream, protocoltype.tcp);

     listensocket.bind(listenpoint);

     listensocket.listen(200);

 

     thread thread1 = new thread(new threadstart(netprocess));

     thread1.start();

    

     startaccept();

     return true;

    }

    catch (exception ex)

    {

     netlogger.log(string.format("**监听异常!{0}", ex.message));

     return false;

    }

   }

 

   autoresetevent _acceptevent = new autoresetevent(false);

   private void netprocess()

   {

    while (start)

    {

     dealnewaccept();

     _acceptevent.waitone(1000 * 10);

    }

   }

 

   private void dealnewaccept()

   {

    try

    {

     if(_acceptasynccount <= 10)

     {

      startaccept();

     }

 

     while (true)

     {

      asyncsocketclient client = _newsocketclientlist.getobj();

      if (client == null)

       break;

 

      dealnewaccept(client);

     }

    }

    catch (exception ex)

    {

     netlogger.log(string.format("dealnewaccept 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

   private void dealnewaccept(asyncsocketclient client)

   {

    client.sendbufferbytecount = _netserver.sendbufferbyteperclient;

    onacceptsocket?.invoke(_listenparam, client);

   }

 

   private void accepteventarg_completed(object sender, socketasynceventargs accepteventargs)

   {

    try

    {

     interlocked.decrement(ref _acceptasynccount);

     _acceptevent.set();

     accepteventargs测试数据pleted -= accepteventarg_completed;

     processaccept(accepteventargs);

    }

    catch (exception ex)

    {

     netlogger.log(string.format("accepteventarg_completed {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

   public bool startaccept()

   {

    socketasynceventargs accepteventargs = new socketasynceventargs();

    accepteventargs测试数据pleted += accepteventarg_completed;

 

    bool willraiseevent = listensocket.acceptasync(accepteventargs);

    interlocked.increment(ref _acceptasynccount);

 

    if (!willraiseevent)

    {

     interlocked.decrement(ref _acceptasynccount);

     _acceptevent.set();

     accepteventargs测试数据pleted -= accepteventarg_completed;

     processaccept(accepteventargs);

    }

    return true;

   }

 

   objectpool< asyncsocketclient > _newsocketclientlist = new objectpool< asyncsocketclient >();

   private void processaccept(socketasynceventargs accepteventargs)

   {

    try

    {

     using (accepteventargs)

     {

      if (accepteventargs.acceptsocket != null)

      {

       asyncsocketclient client = new asyncsocketclient(accepteventargs.acceptsocket);

       client.createclientinfo(this);

 

       _newsocketclientlist.putobj(client);

       _acceptevent.set();

      }

     }

    }

    catch (exception ex)

    {

     netlogger.log(string.format("processaccept {0}***{1}", ex.message, ex.stacktrace));

    }

   }

  }

}

netconnectmanage连接处理

?

using system;

using system.net;

using system.net.sockets;

 

namespace iocpcore

{

  class netconnectmanage

  {

   public event action< socketeventparam , asyncsocketclient> onsocketconnectevent;

 

   public bool connectasyn(string peerip, int peerport, object tag)

   {

    try

    {

     socket socket = new socket(sockettype.stream, protocoltype.tcp);

     socketasynceventargs socketeventargs = new socketasynceventargs();

     socketeventargs.remoteendpoint = new ipendpoint(ipaddress.parse(peerip), peerport);

     socketeventargs测试数据pleted += socketconnect_completed;

 

     socketclientinfo clientinfo = new socketclientinfo();

     socketeventargs.usertoken = clientinfo;

     clientinfo.peerip = peerip;

     clientinfo.peerport = peerport;

     clientinfo.tag = tag;

 

     bool willraiseevent = socket.connectasync(socketeventargs);

     if (!willraiseevent)

     {

      processconnect(socketeventargs);

      socketeventargs测试数据pleted -= socketconnect_completed;

      socketeventargs.dispose();

     }

     return true;

    }

    catch (exception ex)

    {

     netlogger.log("connectasyn",ex);

     return false;

    }

   }

 

   private void socketconnect_completed(object sender, socketasynceventargs socketeventargs)

   {

    processconnect(socketeventargs);

    socketeventargs测试数据pleted -= socketconnect_completed;

    socketeventargs.dispose();

   }

 

   private void processconnect(socketasynceventargs socketeventargs)

   {

    socketclientinfo clientinfo = socketeventargs.usertoken as socketclientinfo;

    if (socketeventargs.socketerror == socketerror.success)

    {

     dealconnectsocket(socketeventargs.connectsocket, clientinfo);

    }

    else

    {

     socketeventparam socketparam = new socketeventparam(en_socketevent.connect, null);

     socketparam.clientinfo = clientinfo;

     onsocketconnectevent?.invoke(socketparam, null);

    }

   }

 

 

   void dealconnectsocket(socket socket, socketclientinfo clientinfo)

   {

    clientinfo.setclientinfo(socket);

 

    asyncsocketclient client = new asyncsocketclient(socket);

    client.setclientinfo(clientinfo);

 

    //触发事件

    socketeventparam socketparam = new socketeventparam(en_socketevent.connect, socket);

    socketparam.clientinfo = clientinfo;

    onsocketconnectevent?.invoke(socketparam, client);

   }

 

   public bool connect(string peerip, int peerport, object tag, out socket socket)

   {

    socket = null;

    try

    {

     socket sockettmp = new socket(sockettype.stream, protocoltype.tcp);

 

     socketclientinfo clientinfo = new socketclientinfo();

     clientinfo.peerip = peerip;

     clientinfo.peerport = peerport;

     clientinfo.tag = tag;

 

     endpoint remoteep = new ipendpoint(ipaddress.parse(peerip), peerport);

     sockettmp.connect(remoteep);

     if (!sockettmp.connected)

      return false;

 

     dealconnectsocket(sockettmp, clientinfo);

     socket = sockettmp;

     return true;

    }

    catch (exception ex)

    {

     netlogger.log(string.format("连接对方:({0}:{1})出错!", peerip, peerport), ex);

     return false;

    }

   }

  }

}

asyncsocketclient socket收发处理

?

using system;

using system.collections.generic;

using system.diagnostics;

using system.net;

using system.net.sockets;

 

namespace iocpcore

{

  public class asyncsocketclient

  {

   public static int iocpreadlen = 1024;

 

   public readonly socket connectsocket;

 

   protected socketasynceventargs m_receiveeventargs;

   public socketasynceventargs receiveeventargs { get { return m_receiveeventargs; } set { m_receiveeventargs = value; } }

   protected byte[] m_asyncreceivebuffer;

 

   protected socketasynceventargs m_sendeventargs;

   public socketasynceventargs sendeventargs { get { return m_sendeventargs; } set { m_sendeventargs = value; } }

   protected byte[] m_asyncsendbuffer;

 

   public event action< asyncsocketclient , byte[]> onreaddata;

   public event action< asyncsocketclient , int> onsenddata;

   public event action< asyncsocketclient > onsocketclose;

 

   static object releaselock = new object();

   public static int createcount = 0;

   public static int releasecount = 0;

 

   ~asyncsocketclient()

   {

    lock (releaselock)

    {

     releasecount++;

    }

   }

 

   public asyncsocketclient(socket socket)

   {

    lock (releaselock)

    {

     createcount++;

    }

 

    connectsocket = socket;

 

    m_receiveeventargs = new socketasynceventargs();

    m_asyncreceivebuffer = new byte[iocpreadlen];

    m_receiveeventargs.acceptsocket = connectsocket;

    m_receiveeventargs测试数据pleted += receiveeventargs_completed;

 

    m_sendeventargs = new socketasynceventargs();

    m_asyncsendbuffer = new byte[iocpreadlen * 2];

    m_sendeventargs.acceptsocket = connectsocket;

    m_sendeventargs测试数据pleted += sendeventargs_completed;

   }

 

   socketclientinfo _clientinfo;

 

   public socketclientinfo clientinfo

   {

    get

    {

     return _clientinfo;

    }

   }

 

   internal void createclientinfo(netlistener netlistener)

   {

    _clientinfo = new socketclientinfo();

    try

    {

     _clientinfo.tag = netlistener._listenparam._tag;

     ipendpoint ip = connectsocket.localendpoint as ipendpoint;

     debug.assert(netlistener._listenparam._port == ip.port);

 

     _clientinfo.localip = ip.address.tostring();

     _clientinfo.localport = netlistener._listenparam._port;

 

     ip = connectsocket.remoteendpoint as ipendpoint;

     _clientinfo.peerip = ip.address.tostring();

     _clientinfo.peerport = ip.port;

    }

    catch (exception ex)

    {

     netlogger.log("createclientinfo", ex);

    }

   }

   internal void setclientinfo(socketclientinfo clientinfo)

   {

    _clientinfo = clientinfo;

   }

 

   #region read process

   bool _inreadpending = false;

   public en_socketreadresult readnextdata()

   {

    lock (this)

    {

     if (_socketerror)

      return en_socketreadresult.readerror;

     if (_inreadpending)

      return en_socketreadresult.inasyn;

     if(!connectsocket.connected)

     {

      onreaderror();

      return en_socketreadresult.readerror;

     }

 

     try

     {

      m_receiveeventargs.setbuffer(m_asyncreceivebuffer, 0, m_asyncreceivebuffer.length);

      _inreadpending = true;

      bool willraiseevent = connectsocket.receiveasync(receiveeventargs); //投递接收请求

      if (!willraiseevent)

      {

       _inreadpending = false;

       processreceive();

       if (_socketerror)

       {

        onreaderror();

        return en_socketreadresult.readerror;

       }

       return en_socketreadresult.haveread;

      }

      else

      {

       return en_socketreadresult.inasyn;

      }

     }

     catch (exception ex)

     {

      netlogger.log("readnextdata", ex);

      _inreadpending = false;

      onreaderror();

      return en_socketreadresult.readerror;

     }

    }

   }

 

   private void processreceive()

   {

    if (receiveeventargs.bytestransferred > 0

     && receiveeventargs.socketerror == socketerror.success)

    {

     int offset = receiveeventargs.offset;

     int count = receiveeventargs.bytestransferred;

 

     byte[] readdata = new byte[count];

     array.copy(m_asyncreceivebuffer, offset, readdata, 0, count);

 

     _inreadpending = false;

     if (!_socketerror)

      onreaddata?.invoke(this, readdata);

    }

    else

    {

     _inreadpending = false;

     onreaderror();

    }

   }

 

   private void receiveeventargs_completed(object sender, socketasynceventargs e)

   {

    lock (this)

    {

     _inreadpending = false;

     processreceive();

     if (_socketerror)

     {

      onreaderror();

     }

    }

   }

 

   bool _socketerror = false;

   private void onreaderror()

   {

    lock (this)

    {

     if (_socketerror == false)

     {

      _socketerror = true;

      onsocketclose?.invoke(this);

     }

     closeclient();

    }

   }

   #endregion

 

   #region send process

   int _sendbufferbytecount = 102400;

   public int sendbufferbytecount

   {

    get

    {

     return _sendbufferbytecount;

    }

    set

    {

     if (value < 1024 )

     {

      _sendbufferbytecount = 1024 ;

     }

     else

     {

      _sendbufferbytecount = value ;

     }

    }

   }

 

   sendbufferpool _senddatapool = new sendbufferpool();

   internal en_senddataresult putsenddata(byte[] data)

   {

    if (_socketerror)

     return en_senddataresult.no_client;

 

    if (_senddatapool._bufferbytecount >= _sendbufferbytecount)

    {

     return en_senddataresult.buffer_overflow;

    }

 

    if (data.length <= iocpreadlen)

    {

     _senddatapool.putobj(data);

    }

    else

    {

     list< byte []> dataitems = splitdata(data, iocpreadlen);

     foreach (byte[] item in dataitems)

     {

      _senddatapool.putobj(item);

     }

    }

 

    return en_senddataresult.ok;

   }

 

   bool _insendpending = false;

   public en_socketsendresult sendnextdata()

   {

    lock (this)

    {

     if (_socketerror)

     {

      return en_socketsendresult.senderror;

     }

 

     if (_insendpending)

     {

      return en_socketsendresult.inasyn;

     }

 

     int sendbytecount = getsenddata();

     if (sendbytecount == 0)

     {

      return en_socketsendresult.nosenddata;

     }

 

     //防止抛出异常,否则影响性能

     if (!connectsocket.connected)

     {

      onsenderror();

      return en_socketsendresult.senderror;

     }

 

     try

     {

      m_sendeventargs.setbuffer(m_asyncsendbuffer, 0, sendbytecount);

      _insendpending = true;

      bool willraiseevent = connectsocket.sendasync(m_sendeventargs);

      if (!willraiseevent)

      {

       _insendpending = false;

       processsend(m_sendeventargs);

       if (_socketerror)

       {

        onsenderror();

        return en_socketsendresult.senderror;

       }

       else

       {

        onsenddata?.invoke(this, sendbytecount);

        //继续发下一条

        return en_socketsendresult.havesend;

       }

      }

      else

      {

       return en_socketsendresult.inasyn;

      }

     }

     catch (exception ex)

     {

      netlogger.log("sendnextdata", ex);

      _insendpending = false;

      onsenderror();

      return en_socketsendresult.senderror;

     }

    }

   }

 

   private void sendeventargs_completed(object sender, socketasynceventargs sendeventargs)

   {

    lock (this)

    {

     try

     {

      _insendpending = false;

      processsend(m_sendeventargs);

 

      int sendcount = 0;

      if (sendeventargs.socketerror == socketerror.success)

      {

       sendcount = sendeventargs.bytestransferred;

      }

      onsenddata?.invoke(this, sendcount);

 

      if (_socketerror)

      {

       onsenderror();

      }

     }

     catch (exception ex)

     {

      netlogger.log("sendeventargs_completed", ex);

     }

    }

   }

 

   private bool processsend(socketasynceventargs sendeventargs)

   {

    if (sendeventargs.socketerror == socketerror.success)

    {

     return true;

    }

    else

    {

     onsenderror();

     return false;

    }

   }

 

   private int getsenddata()

   {

    int datalen = 0;

    while (true)

    {

     byte[] data = _senddatapool.getobj();

     if (data == null)

      return datalen;

     array.copy(data, 0, m_asyncsendbuffer, datalen, data.length);

     datalen += data.length;

     if (datalen > iocpreadlen)

      break;

    }

    return datalen;

   }

   private void onsenderror()

   {

    lock (this)

    {

     if (_socketerror == false)

     {

      _socketerror = true;

      onsocketclose?.invoke(this);

     }

     closeclient();

    }

   }

   #endregion

 

   internal void closesocket()

   {

    try

    {

     connectsocket.close();

    }

    catch (exception ex)

    {

     netlogger.log("closesocket", ex);

    }

   }

 

   static object socketcloselock = new object();

   public static int closesendcount = 0;

   public static int closereadcount = 0;

 

   bool _disposesend = false;

   void closesend()

   {

    if (!_disposesend && !_insendpending)

    {

     lock (socketcloselock)

      closesendcount++;

 

     _disposesend = true;

     m_sendeventargs.setbuffer(null, 0, 0);

     m_sendeventargs测试数据pleted -= sendeventargs_completed;

     m_sendeventargs.dispose();

    }

   }

 

   bool _disposeread = false;

   void closeread()

   {

    if (!_disposeread && !_inreadpending)

    {

     lock (socketcloselock)

      closereadcount++;

 

     _disposeread = true;

     m_receiveeventargs.setbuffer(null, 0, 0);

     m_receiveeventargs测试数据pleted -= receiveeventargs_completed;

     m_receiveeventargs.dispose();

    }

   }

   private void closeclient()

   {

    try

    {

     closesend();

     closeread();

     connectsocket.close();

    }

    catch (exception ex)

    {

     netlogger.log("closeclient", ex);

    }

   }

 

   //发送缓冲大小

   private list< byte []> splitdata(byte[] data, int maxlen)

   {

    list< byte []> items = new list< byte []>();

 

    int start = 0;

    while (true)

    {

     int itemlen = math.min(maxlen, data.length - start);

     if (itemlen == 0)

      break;

     byte[] item = new byte[itemlen];

     array.copy(data, start, item, 0, itemlen);

     items.add(item);

 

     start += itemlen;

    }

    return items;

   }

  }

 

  public enum en_socketreadresult

  {

   inasyn,

   haveread,

   readerror

  }

 

  public enum en_socketsendresult

  {

   inasyn,

   havesend,

   nosenddata,

   senderror

  }

 

  class sendbufferpool

  {

   objectpool< byte []> _bufferpool = new objectpool< byte []>();

 

   public int64 _bufferbytecount = 0;

   public bool putobj(byte[] obj)

   {

    if (_bufferpool.putobj(obj))

    {

     lock (this)

     {

      _bufferbytecount += obj.length;

     }

     return true;

    }

    else

    {

     return false;

    }

   }

 

   public byte[] getobj()

   {

    byte[] result = _bufferpool.getobj();

    if (result != null)

    {

     lock (this)

     {

      _bufferbytecount -= result.length;

     }

    }

    return result;

   }

  }

}

netserver 聚合其他类

?

using system;

using system.collections.generic;

using system.diagnostics;

using system.linq;

using system.net.sockets;

using system.threading;

 

namespace iocpcore

{

  public class netserver

  {

   public action< socketeventparam > onsocketpacketevent;

 

   //每个连接发送缓冲大小

   public int sendbufferbyteperclient { get; set; } = 1024 * 100;

 

   bool _serverstart = false;

   list< netlistener > _listlistener = new list< netlistener >();

 

   //负责对收到的字节流 组成完成的包

   clientpacketmanage _clientpacketmanage;

 

   public int64 sendbytecount { get; set; }

   public int64 readbytecount { get; set; }

 

   list< listenparam > _listlistenport = new list< listenparam >();

   public void addlistenport(int port, object tag)

   {

    _listlistenport.add(new listenparam(port, tag));

   }

   /// < summary >

   ///

   /// </ summary >

   /// < param name = "listenfault" >监听失败的端口</ param >

   /// < returns ></ returns >

   public bool startlisten(out list< int > listenfault)

   {

    _serverstart = true;

 

    _clientpacketmanage = new clientpacketmanage(this);

    _clientpacketmanage.onsocketpacketevent += putclientpacket;

 

    _netconnectmanage.onsocketconnectevent += socketconnectevent;

 

    _listlistener.clear();

    thread thread1 = new thread(new threadstart(netpacketprocess));

    thread1.start();

 

    thread thread2 = new thread(new threadstart(netsendprocess));

    thread2.start();

 

    thread thread3 = new thread(new threadstart(netreadprocess));

    thread3.start();

 

    listenfault = new list< int >();

    foreach (listenparam param in _listlistenport)

    {

     netlistener listener = new netlistener(this);

     listener._listenparam = param;

     listener.onacceptsocket += listener_onacceptsocket;

     if (!listener.startlisten())

     {

      listenfault.add(param._port);

     }

     else

     {

      _listlistener.add(listener);

      netlogger.log(string.format("监听成功!端口:{0}", param._port));

     }

    }

 

    return listenfault.count == 0;

   }

 

   public void putclientpacket(socketeventparam param)

   {

    onsocketpacketevent?.invoke(param);

   }

 

   //获取包的最小长度

   int _packetminlen;

   int _packetmaxlen;

   public int packetminlen

   {

    get { return _packetminlen; }

   }

   public int packetmaxlen

   {

    get { return _packetmaxlen; }

   }

 

   /// < summary >

   /// 设置包的最小和最大长度

   /// 当minlen=0时,认为是接收字节流

   /// </ summary >

   /// < param name = "minlen" ></ param >

   /// < param name = "maxlen" ></ param >

   public void setpacketparam(int minlen, int maxlen)

   {

    debug.assert(minlen >= 0);

    debug.assert(maxlen > minlen);

    _packetminlen = minlen;

    _packetmaxlen = maxlen;

   }

 

   //获取包的总长度

   public delegate int delegate_getpackettotallen(byte[] data, int offset);

   public delegate_getpackettotallen getpackettotallen_callback;

 

   objectpoolwithevent< socketeventparam > _socketeventpool = new objectpoolwithevent< socketeventparam >();

   private void netpacketprocess()

   {

    while (_serverstart)

    {

     try

     {

      dealeventpool();

     }

     catch (exception ex)

     {

      netlogger.log(string.format("dealeventpool 异常 {0}***{1}", ex.message, ex.stacktrace));

     }

     _socketeventpool.waitone(1000);

    }

   }

 

   dictionary< socket , asyncsocketclient> _clientgroup = new dictionary< socket , asyncsocketclient>();

   public int clientcount

   {

    get

    {

     lock (_clientgroup)

     {

      return _clientgroup.count;

     }

    }

   }

   public list< socket > clientlist

   {

    get

    {

     lock (_clientgroup)

     {

      return _clientgroup.keys.tolist();

     }

    }

   }

 

   private void dealeventpool()

   {

    while (true)

    {

     socketeventparam param = _socketeventpool.getobj();

     if (param == null)

      return;

 

     if (param.socketevent == en_socketevent.close)

     {

      lock (_clientgroup)

      {

       _clientgroup.remove(param.socket);

      }

     }

 

     if (_packetminlen == 0)//字节流处理

     {

      onsocketpacketevent?.invoke(param);

     }

     else

     {

      //组成一个完整的包 逻辑

      _clientpacketmanage.putsocketparam(param);

     }

    }

   }

 

   private void socketconnectevent(socketeventparam param, asyncsocketclient client)

   {

    try

    {

     if (param.socket == null || client == null) //连接失败

     {

     

     }

     else

     {

      lock (_clientgroup)

      {

       bool remove = _clientgroup.remove(client.connectsocket);

       debug.assert(!remove);

       _clientgroup.add(client.connectsocket, client);

      }

 

      client.onsocketclose += client_onsocketclose;

      client.onreaddata += client_onreaddata;

      client.onsenddata += client_onsenddata;

 

      _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

     }

     _socketeventpool.putobj(param);

    }

    catch (exception ex)

    {

     netlogger.log(string.format("socketconnectevent 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

   internal void onrcvpacketlenerror(socket socket, byte[] buffer, int offset, int packetlen)

   {

    try

    {

     lock (_clientgroup)

     {

      if (!_clientgroup.containskey(socket))

      {

       debug.assert(false);

       return;

      }

 

      netlogger.log(string.format("报长度异常!包长:{0}", packetlen));

      asyncsocketclient client = _clientgroup[socket];

      client.closesocket();

     }

    }

    catch (exception ex)

    {

     netlogger.log(string.format("onrcvpacketlenerror 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

   #region listen port

   private void listener_onacceptsocket(listenparam listenpatam, asyncsocketclient client)

   {

    try

    {

     lock (_clientgroup)

     {

      bool remove = _clientgroup.remove(client.connectsocket);

      debug.assert(!remove);

      _clientgroup.add(client.connectsocket, client);

     }

 

     client.onsocketclose += client_onsocketclose;

     client.onreaddata += client_onreaddata;

     client.onsenddata += client_onsenddata;

 

     _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

 

     socketeventparam param = new socketeventparam(en_socketevent.accept, client.connectsocket);

     param.clientinfo = client.clientinfo;

 

     _socketeventpool.putobj(param);

    }

    catch (exception ex)

    {

     netlogger.log(string.format("listener_onacceptsocket 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

 

   objectpoolwithevent< socketeventdeal > _listsendevent = new objectpoolwithevent< socketeventdeal >();

   private void netsendprocess()

   {

    while (true)

    {

     dealsendevent();

     _listsendevent.waitone(1000);

    }

   }

 

   objectpoolwithevent< socketeventdeal > _listreadevent = new objectpoolwithevent< socketeventdeal >();

   private void netreadprocess()

   {

    while (true)

    {

     dealreadevent();

     _listreadevent.waitone(1000);

    }

   }

 

  

   private void dealsendevent()

   {

    while (true)

    {

     socketeventdeal item = _listsendevent.getobj();

     if (item == null)

      break;

     switch (item.socketevent)

     {

      case en_socketdealevent.send:

       {

        while (true)

        {

         en_socketsendresult result = item.client.sendnextdata();

         if (result == en_socketsendresult.havesend)

          continue;

         else

          break;

        }

       }

       break;

      case en_socketdealevent.read:

       {

        debug.assert(false);

       }

       break;    

     }

    }

   }

 

   private void dealreadevent()

   {

    while (true)

    {

     socketeventdeal item = _listreadevent.getobj();

     if (item == null)

      break;

     switch (item.socketevent)

     {

      case en_socketdealevent.read:

       {

        while (true)

        {

         en_socketreadresult result = item.client.readnextdata();

         if (result == en_socketreadresult.haveread)

          continue;

         else

          break;

        }

       }

       break;

      case en_socketdealevent.send:

       {

        debug.assert(false);

       }

       break;

     }

    }

   }

 

   private void client_onreaddata(asyncsocketclient client, byte[] readdata)

   {

    //读下一条

    _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

 

    try

    {

     socketeventparam param = new socketeventparam(en_socketevent.read, client.connectsocket);

     param.clientinfo = client.clientinfo;

     param.data = readdata;

     _socketeventpool.putobj(param);

 

     lock (this)

     {

      readbytecount += readdata.length;

     }

    }

    catch (exception ex)

    {

     netlogger.log(string.format("client_onreaddata 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

#endregion

 

   private void client_onsenddata(asyncsocketclient client, int sendcount)

   {

    //发送下一条

    _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));

    lock (this)

    {

     sendbytecount += sendcount;

    }

   }

 

   private void client_onsocketclose(asyncsocketclient client)

   {

    try

    {

     socketeventparam param = new socketeventparam(en_socketevent.close, client.connectsocket);

     param.clientinfo = client.clientinfo;

     _socketeventpool.putobj(param);

    }

    catch (exception ex)

    {

     netlogger.log(string.format("client_onsocketclose 异常 {0}***{1}", ex.message, ex.stacktrace));

    }

   }

 

   /// < summary >

   /// 放到发送缓冲

   /// </ summary >

   /// < param name = "socket" ></ param >

   /// < param name = "data" ></ param >

   /// < returns ></ returns >

   public en_senddataresult senddata(socket socket, byte[] data)

   {

    if (socket == null)

     return en_senddataresult.no_client;

    lock (_clientgroup)

    {

     if (!_clientgroup.containskey(socket))

      return en_senddataresult.no_client;

     asyncsocketclient client = _clientgroup[socket];

     en_senddataresult result = client.putsenddata(data);

     if (result == en_senddataresult.ok)

     {

      //发送下一条

      _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));    

     }

     return result;

    }

   }

 

   /// < summary >

   /// 设置某个连接的发送缓冲大小

   /// </ summary >

   /// < param name = "socket" ></ param >

   /// < param name = "bytecount" ></ param >

   /// < returns ></ returns >

   public bool setclientsendbuffer(socket socket, int bytecount)

   {

    lock (_clientgroup)

    {

     if (!_clientgroup.containskey(socket))

      return false;

     asyncsocketclient client = _clientgroup[socket];

     client.sendbufferbytecount = bytecount;

     return true;

    }

   }

 

 

   #region connect process

   netconnectmanage _netconnectmanage = new netconnectmanage();

   /// < summary >

   /// 异步连接一个客户端

   /// </ summary >

   /// < param name = "peerip" ></ param >

   /// < param name = "peerport" ></ param >

   /// < param name = "tag" ></ param >

   /// < returns ></ returns >

   public bool connectasyn(string peerip, int peerport, object tag)

   {

    return _netconnectmanage.connectasyn(peerip, peerport, tag);

   }

 

   /// < summary >

   /// 同步连接一个客户端

   /// </ summary >

   /// < param name = "peerip" ></ param >

   /// < param name = "peerport" ></ param >

   /// < param name = "tag" ></ param >

   /// < param name = "socket" ></ param >

   /// < returns ></ returns >

   public bool connect(string peerip, int peerport, object tag, out socket socket)

   {

    return _netconnectmanage.connect(peerip, peerport, tag, out socket);

   }

   #endregion

  }

 

  enum en_socketdealevent

  {

   read,

   send,

  }

  class socketeventdeal

  {

   public asyncsocketclient client { get; set; }

   public en_socketdealevent socketevent { get; set; }

   public socketeventdeal(asyncsocketclient client, en_socketdealevent socketevent)

   {

    client = client;

    socketevent = socketevent;

   }

  }

}

库的使用

使用起来非常简单,示例如下

?

using iocpcore;

using system;

using system.collections.generic;

using system.linq;

using system.net.sockets;

using system.text;

using system.threading.tasks;

using system.windows;

 

namespace warningclient

{

  public class socketserver

  {

   public action< socketeventparam > onsocketevent;

 

   public int64 sendbytecount

   {

    get

    {

     if (_netserver == null)

      return 0;

     return _netserver.sendbytecount;

    }

   }

   public int64 readbytecount

   {

    get

    {

     if (_netserver == null)

      return 0;

     return _netserver.readbytecount;

    }

   }

 

   netserver _netserver;

   en_packettype _packettype = en_packettype.bytestream;

   public void setpackttype(en_packettype packettype)

   {

    _packettype = packettype;

    if (_netserver == null)

     return;

    if (packettype == en_packettype.bytestream)

    {

     _netserver.setpacketparam(0, 1024);

    }

    else

    {

     _netserver.setpacketparam(9, 1024);

    }

   }

 

   public bool init(list< int > listenport)

   {

    netlogger.onlogevent += netlogger_onlogevent;

    _netserver = new netserver();

    setpackttype(_packettype);

    _netserver.getpackettotallen_callback += getpackettotallen;

    _netserver.onsocketpacketevent += socketpacketdeal;

 

    foreach (int n in listenport)

    {

     _netserver.addlistenport(n, n);

    }

 

    list< int > listenfault;

    bool start = _netserver.startlisten(out listenfault);

    return start;

   }

 

   int getpackettotallen(byte[] data, int offset)

   {

    if (mainwindow._packettype == en_packettype.znss)

     return getpacketznss(data, offset);

    else

     return getpacketanzhiyuan(data, offset);

   }

 

   int getpacketanzhiyuan(byte[] data, int offset)

   {

    int n = data[offset + 5] + 6;

    return n;

   }

 

   int getpacketznss(byte[] data, int offset)

   {

    int packetlen = (int)(data[4]) + 5;

    return packetlen;

   }

 

 

   public bool connectasyn(string peerip, int peerport, object tag)

   {

    return _netserver.connectasyn(peerip, peerport, tag);

   }

 

   public bool connect(string peerip, int peerport, object tag, out socket socket)

   {

    return _netserver.connect(peerip, peerport, tag, out socket);

   }

 

   private void netlogger_onlogevent(string message)

   {

    applog.log(message);

   }

 

   dictionary< socket , socketeventparam> _clientgroup = new dictionary< socket , socketeventparam>();

 

   public int clientcount

   {

    get

    {

     lock (_clientgroup)

     {

      return _clientgroup.count;

     }

    }

   }

   public list< socket > clientlist

   {

    get

    {

     if (_netserver != null)

      return _netserver.clientlist;

     return new list< socket >();

    }

   }

   void addclient(socketeventparam socketparam)

   {

    lock (_clientgroup)

    {

     _clientgroup.remove(socketparam.socket);

     _clientgroup.add(socketparam.socket, socketparam);

    }

   }

 

   void removeclient(socketeventparam socketparam)

   {

    lock (_clientgroup)

    {

     _clientgroup.remove(socketparam.socket);

    }

   }

 

   objectpool< socketeventparam > _readdatapool = new objectpool< socketeventparam >();

 

   public objectpool< socketeventparam > readdatapool

   {

    get

    {

     return _readdatapool;

    }

   }

 

   private void socketpacketdeal(socketeventparam socketparam)

   {

    onsocketevent?.invoke(socketparam);

    if (socketparam.socketevent == en_socketevent.read)

    {

     if (mainwindow._isshowreadpacket)

      _readdatapool.putobj(socketparam);

    }

    else if (socketparam.socketevent == en_socketevent.accept)

    {

     addclient(socketparam);

     string peerip = socketparam.clientinfo.peeripport;

     applog.log(string.format("客户端链接!本地端口:{0},对端:{1}",

      socketparam.clientinfo.localport, peerip));

    }

    else if (socketparam.socketevent == en_socketevent.connect)

    {

     string peerip = socketparam.clientinfo.peeripport;

     if (socketparam.socket != null)

     {

      addclient(socketparam);

 

      applog.log(string.format("连接对端成功!本地端口:{0},对端:{1}",

       socketparam.clientinfo.localport, peerip));

     }

     else

     {

      applog.log(string.format("连接对端失败!本地端口:{0},对端:{1}",

       socketparam.clientinfo.localport, peerip));

     }

    }

    else if (socketparam.socketevent == en_socketevent.close)

    {

     mainwindow.mainwnd.onsocketdisconnect(socketparam.socket);

     removeclient(socketparam);

     string peerip = socketparam.clientinfo.peeripport;

     applog.log(string.format("客户端断开!本地端口:{0},对端:{1},",

      socketparam.clientinfo.localport, peerip));

    }

   }

 

   public en_senddataresult senddata(socket socket, byte[] data)

   {

    if(socket == null)

    {

     messagebox.show("还没连接!");

     return en_senddataresult.no_client;

    }

    return _netserver.senddata(socket, data);

   }

 

   internal void sendtoall(byte[] data)

   {

    lock (_clientgroup)

    {

     foreach (socket socket in _clientgroup.keys)

     {

      senddata(socket, data);

     }

    }

   }

  }

}

以上这篇c#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:http://HdhCmsTestcnblogs测试数据/yuanchenhui/archive/2017/11/28/asyn_scoket.html

dy("nrwz");

查看更多关于C#中一个高性能异步socket封装库的实现思路分享的详细内容...

  阅读:50次