前言
socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。
异步通讯实际是利用windows完成端口(iocp)来处理的,关于完成端口实现原理,大家可以参考网上文章。
我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!
异步通讯比同步通讯处理要难很多,代码编写中会遇到许多[坑[。如果没有经验,很难完成。
我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。
纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。
在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。
为了使大家对通讯效率有初步了解,先看测试图。
主机配置情况
百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。
这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。
库的结构图
目标
即可作为服务端(监听)也可以作为客户端(主动连接)使用。
可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。
高可用性。将复杂的底层处理封装,对外接口非常友好。
高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。
实现思路
网络处理逻辑可以分为以下几个部分:
网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。
主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。
socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1k的数据。
组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。
netlistener 监听
using system;
using system.net;
using system.net.sockets;
using system.threading;
namespace iocpcore
{
class netlistener
{
private socket listensocket;
public listenparam _listenparam { get; set; }
public event action< listenparam , asyncsocketclient> onacceptsocket;
bool start;
netserver _netserver;
public netlistener(netserver netserver)
{
_netserver = netserver;
}
public int _acceptasynccount = 0;
public bool startlisten()
{
try
{
start = true;
ipendpoint listenpoint = new ipendpoint(ipaddress.parse("0.0.0.0"), _listenparam._port);
listensocket = new socket(listenpoint.addressfamily, sockettype.stream, protocoltype.tcp);
listensocket.bind(listenpoint);
listensocket.listen(200);
thread thread1 = new thread(new threadstart(netprocess));
thread1.start();
startaccept();
return true;
}
catch (exception ex)
{
netlogger.log(string.format("**监听异常!{0}", ex.message));
return false;
}
}
autoresetevent _acceptevent = new autoresetevent(false);
private void netprocess()
{
while (start)
{
dealnewaccept();
_acceptevent.waitone(1000 * 10);
}
}
private void dealnewaccept()
{
try
{
if(_acceptasynccount <= 10)
{
startaccept();
}
while (true)
{
asyncsocketclient client = _newsocketclientlist.getobj();
if (client == null)
break;
dealnewaccept(client);
}
}
catch (exception ex)
{
netlogger.log(string.format("dealnewaccept 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
private void dealnewaccept(asyncsocketclient client)
{
client.sendbufferbytecount = _netserver.sendbufferbyteperclient;
onacceptsocket?.invoke(_listenparam, client);
}
private void accepteventarg_completed(object sender, socketasynceventargs accepteventargs)
{
try
{
interlocked.decrement(ref _acceptasynccount);
_acceptevent.set();
accepteventargs测试数据pleted -= accepteventarg_completed;
processaccept(accepteventargs);
}
catch (exception ex)
{
netlogger.log(string.format("accepteventarg_completed {0}***{1}", ex.message, ex.stacktrace));
}
}
public bool startaccept()
{
socketasynceventargs accepteventargs = new socketasynceventargs();
accepteventargs测试数据pleted += accepteventarg_completed;
bool willraiseevent = listensocket.acceptasync(accepteventargs);
interlocked.increment(ref _acceptasynccount);
if (!willraiseevent)
{
interlocked.decrement(ref _acceptasynccount);
_acceptevent.set();
accepteventargs测试数据pleted -= accepteventarg_completed;
processaccept(accepteventargs);
}
return true;
}
objectpool< asyncsocketclient > _newsocketclientlist = new objectpool< asyncsocketclient >();
private void processaccept(socketasynceventargs accepteventargs)
{
try
{
using (accepteventargs)
{
if (accepteventargs.acceptsocket != null)
{
asyncsocketclient client = new asyncsocketclient(accepteventargs.acceptsocket);
client.createclientinfo(this);
_newsocketclientlist.putobj(client);
_acceptevent.set();
}
}
}
catch (exception ex)
{
netlogger.log(string.format("processaccept {0}***{1}", ex.message, ex.stacktrace));
}
}
}
}
netconnectmanage连接处理
using system;
using system.net;
using system.net.sockets;
namespace iocpcore
{
class netconnectmanage
{
public event action< socketeventparam , asyncsocketclient> onsocketconnectevent;
public bool connectasyn(string peerip, int peerport, object tag)
{
try
{
socket socket = new socket(sockettype.stream, protocoltype.tcp);
socketasynceventargs socketeventargs = new socketasynceventargs();
socketeventargs.remoteendpoint = new ipendpoint(ipaddress.parse(peerip), peerport);
socketeventargs测试数据pleted += socketconnect_completed;
socketclientinfo clientinfo = new socketclientinfo();
socketeventargs.usertoken = clientinfo;
clientinfo.peerip = peerip;
clientinfo.peerport = peerport;
clientinfo.tag = tag;
bool willraiseevent = socket.connectasync(socketeventargs);
if (!willraiseevent)
{
processconnect(socketeventargs);
socketeventargs测试数据pleted -= socketconnect_completed;
socketeventargs.dispose();
}
return true;
}
catch (exception ex)
{
netlogger.log("connectasyn",ex);
return false;
}
}
private void socketconnect_completed(object sender, socketasynceventargs socketeventargs)
{
processconnect(socketeventargs);
socketeventargs测试数据pleted -= socketconnect_completed;
socketeventargs.dispose();
}
private void processconnect(socketasynceventargs socketeventargs)
{
socketclientinfo clientinfo = socketeventargs.usertoken as socketclientinfo;
if (socketeventargs.socketerror == socketerror.success)
{
dealconnectsocket(socketeventargs.connectsocket, clientinfo);
}
else
{
socketeventparam socketparam = new socketeventparam(en_socketevent.connect, null);
socketparam.clientinfo = clientinfo;
onsocketconnectevent?.invoke(socketparam, null);
}
}
void dealconnectsocket(socket socket, socketclientinfo clientinfo)
{
clientinfo.setclientinfo(socket);
asyncsocketclient client = new asyncsocketclient(socket);
client.setclientinfo(clientinfo);
//触发事件
socketeventparam socketparam = new socketeventparam(en_socketevent.connect, socket);
socketparam.clientinfo = clientinfo;
onsocketconnectevent?.invoke(socketparam, client);
}
public bool connect(string peerip, int peerport, object tag, out socket socket)
{
socket = null;
try
{
socket sockettmp = new socket(sockettype.stream, protocoltype.tcp);
socketclientinfo clientinfo = new socketclientinfo();
clientinfo.peerip = peerip;
clientinfo.peerport = peerport;
clientinfo.tag = tag;
endpoint remoteep = new ipendpoint(ipaddress.parse(peerip), peerport);
sockettmp.connect(remoteep);
if (!sockettmp.connected)
return false;
dealconnectsocket(sockettmp, clientinfo);
socket = sockettmp;
return true;
}
catch (exception ex)
{
netlogger.log(string.format("连接对方:({0}:{1})出错!", peerip, peerport), ex);
return false;
}
}
}
}
asyncsocketclient socket收发处理
using system;
using system.collections.generic;
using system.diagnostics;
using system.net;
using system.net.sockets;
namespace iocpcore
{
public class asyncsocketclient
{
public static int iocpreadlen = 1024;
public readonly socket connectsocket;
protected socketasynceventargs m_receiveeventargs;
public socketasynceventargs receiveeventargs { get { return m_receiveeventargs; } set { m_receiveeventargs = value; } }
protected byte[] m_asyncreceivebuffer;
protected socketasynceventargs m_sendeventargs;
public socketasynceventargs sendeventargs { get { return m_sendeventargs; } set { m_sendeventargs = value; } }
protected byte[] m_asyncsendbuffer;
public event action< asyncsocketclient , byte[]> onreaddata;
public event action< asyncsocketclient , int> onsenddata;
public event action< asyncsocketclient > onsocketclose;
static object releaselock = new object();
public static int createcount = 0;
public static int releasecount = 0;
~asyncsocketclient()
{
lock (releaselock)
{
releasecount++;
}
}
public asyncsocketclient(socket socket)
{
lock (releaselock)
{
createcount++;
}
connectsocket = socket;
m_receiveeventargs = new socketasynceventargs();
m_asyncreceivebuffer = new byte[iocpreadlen];
m_receiveeventargs.acceptsocket = connectsocket;
m_receiveeventargs测试数据pleted += receiveeventargs_completed;
m_sendeventargs = new socketasynceventargs();
m_asyncsendbuffer = new byte[iocpreadlen * 2];
m_sendeventargs.acceptsocket = connectsocket;
m_sendeventargs测试数据pleted += sendeventargs_completed;
}
socketclientinfo _clientinfo;
public socketclientinfo clientinfo
{
get
{
return _clientinfo;
}
}
internal void createclientinfo(netlistener netlistener)
{
_clientinfo = new socketclientinfo();
try
{
_clientinfo.tag = netlistener._listenparam._tag;
ipendpoint ip = connectsocket.localendpoint as ipendpoint;
debug.assert(netlistener._listenparam._port == ip.port);
_clientinfo.localip = ip.address.tostring();
_clientinfo.localport = netlistener._listenparam._port;
ip = connectsocket.remoteendpoint as ipendpoint;
_clientinfo.peerip = ip.address.tostring();
_clientinfo.peerport = ip.port;
}
catch (exception ex)
{
netlogger.log("createclientinfo", ex);
}
}
internal void setclientinfo(socketclientinfo clientinfo)
{
_clientinfo = clientinfo;
}
#region read process
bool _inreadpending = false;
public en_socketreadresult readnextdata()
{
lock (this)
{
if (_socketerror)
return en_socketreadresult.readerror;
if (_inreadpending)
return en_socketreadresult.inasyn;
if(!connectsocket.connected)
{
onreaderror();
return en_socketreadresult.readerror;
}
try
{
m_receiveeventargs.setbuffer(m_asyncreceivebuffer, 0, m_asyncreceivebuffer.length);
_inreadpending = true;
bool willraiseevent = connectsocket.receiveasync(receiveeventargs); //投递接收请求
if (!willraiseevent)
{
_inreadpending = false;
processreceive();
if (_socketerror)
{
onreaderror();
return en_socketreadresult.readerror;
}
return en_socketreadresult.haveread;
}
else
{
return en_socketreadresult.inasyn;
}
}
catch (exception ex)
{
netlogger.log("readnextdata", ex);
_inreadpending = false;
onreaderror();
return en_socketreadresult.readerror;
}
}
}
private void processreceive()
{
if (receiveeventargs.bytestransferred > 0
&& receiveeventargs.socketerror == socketerror.success)
{
int offset = receiveeventargs.offset;
int count = receiveeventargs.bytestransferred;
byte[] readdata = new byte[count];
array.copy(m_asyncreceivebuffer, offset, readdata, 0, count);
_inreadpending = false;
if (!_socketerror)
onreaddata?.invoke(this, readdata);
}
else
{
_inreadpending = false;
onreaderror();
}
}
private void receiveeventargs_completed(object sender, socketasynceventargs e)
{
lock (this)
{
_inreadpending = false;
processreceive();
if (_socketerror)
{
onreaderror();
}
}
}
bool _socketerror = false;
private void onreaderror()
{
lock (this)
{
if (_socketerror == false)
{
_socketerror = true;
onsocketclose?.invoke(this);
}
closeclient();
}
}
#endregion
#region send process
int _sendbufferbytecount = 102400;
public int sendbufferbytecount
{
get
{
return _sendbufferbytecount;
}
set
{
if (value < 1024 )
{
_sendbufferbytecount = 1024 ;
}
else
{
_sendbufferbytecount = value ;
}
}
}
sendbufferpool _senddatapool = new sendbufferpool();
internal en_senddataresult putsenddata(byte[] data)
{
if (_socketerror)
return en_senddataresult.no_client;
if (_senddatapool._bufferbytecount >= _sendbufferbytecount)
{
return en_senddataresult.buffer_overflow;
}
if (data.length <= iocpreadlen)
{
_senddatapool.putobj(data);
}
else
{
list< byte []> dataitems = splitdata(data, iocpreadlen);
foreach (byte[] item in dataitems)
{
_senddatapool.putobj(item);
}
}
return en_senddataresult.ok;
}
bool _insendpending = false;
public en_socketsendresult sendnextdata()
{
lock (this)
{
if (_socketerror)
{
return en_socketsendresult.senderror;
}
if (_insendpending)
{
return en_socketsendresult.inasyn;
}
int sendbytecount = getsenddata();
if (sendbytecount == 0)
{
return en_socketsendresult.nosenddata;
}
//防止抛出异常,否则影响性能
if (!connectsocket.connected)
{
onsenderror();
return en_socketsendresult.senderror;
}
try
{
m_sendeventargs.setbuffer(m_asyncsendbuffer, 0, sendbytecount);
_insendpending = true;
bool willraiseevent = connectsocket.sendasync(m_sendeventargs);
if (!willraiseevent)
{
_insendpending = false;
processsend(m_sendeventargs);
if (_socketerror)
{
onsenderror();
return en_socketsendresult.senderror;
}
else
{
onsenddata?.invoke(this, sendbytecount);
//继续发下一条
return en_socketsendresult.havesend;
}
}
else
{
return en_socketsendresult.inasyn;
}
}
catch (exception ex)
{
netlogger.log("sendnextdata", ex);
_insendpending = false;
onsenderror();
return en_socketsendresult.senderror;
}
}
}
private void sendeventargs_completed(object sender, socketasynceventargs sendeventargs)
{
lock (this)
{
try
{
_insendpending = false;
processsend(m_sendeventargs);
int sendcount = 0;
if (sendeventargs.socketerror == socketerror.success)
{
sendcount = sendeventargs.bytestransferred;
}
onsenddata?.invoke(this, sendcount);
if (_socketerror)
{
onsenderror();
}
}
catch (exception ex)
{
netlogger.log("sendeventargs_completed", ex);
}
}
}
private bool processsend(socketasynceventargs sendeventargs)
{
if (sendeventargs.socketerror == socketerror.success)
{
return true;
}
else
{
onsenderror();
return false;
}
}
private int getsenddata()
{
int datalen = 0;
while (true)
{
byte[] data = _senddatapool.getobj();
if (data == null)
return datalen;
array.copy(data, 0, m_asyncsendbuffer, datalen, data.length);
datalen += data.length;
if (datalen > iocpreadlen)
break;
}
return datalen;
}
private void onsenderror()
{
lock (this)
{
if (_socketerror == false)
{
_socketerror = true;
onsocketclose?.invoke(this);
}
closeclient();
}
}
#endregion
internal void closesocket()
{
try
{
connectsocket.close();
}
catch (exception ex)
{
netlogger.log("closesocket", ex);
}
}
static object socketcloselock = new object();
public static int closesendcount = 0;
public static int closereadcount = 0;
bool _disposesend = false;
void closesend()
{
if (!_disposesend && !_insendpending)
{
lock (socketcloselock)
closesendcount++;
_disposesend = true;
m_sendeventargs.setbuffer(null, 0, 0);
m_sendeventargs测试数据pleted -= sendeventargs_completed;
m_sendeventargs.dispose();
}
}
bool _disposeread = false;
void closeread()
{
if (!_disposeread && !_inreadpending)
{
lock (socketcloselock)
closereadcount++;
_disposeread = true;
m_receiveeventargs.setbuffer(null, 0, 0);
m_receiveeventargs测试数据pleted -= receiveeventargs_completed;
m_receiveeventargs.dispose();
}
}
private void closeclient()
{
try
{
closesend();
closeread();
connectsocket.close();
}
catch (exception ex)
{
netlogger.log("closeclient", ex);
}
}
//发送缓冲大小
private list< byte []> splitdata(byte[] data, int maxlen)
{
list< byte []> items = new list< byte []>();
int start = 0;
while (true)
{
int itemlen = math.min(maxlen, data.length - start);
if (itemlen == 0)
break;
byte[] item = new byte[itemlen];
array.copy(data, start, item, 0, itemlen);
items.add(item);
start += itemlen;
}
return items;
}
}
public enum en_socketreadresult
{
inasyn,
haveread,
readerror
}
public enum en_socketsendresult
{
inasyn,
havesend,
nosenddata,
senderror
}
class sendbufferpool
{
objectpool< byte []> _bufferpool = new objectpool< byte []>();
public int64 _bufferbytecount = 0;
public bool putobj(byte[] obj)
{
if (_bufferpool.putobj(obj))
{
lock (this)
{
_bufferbytecount += obj.length;
}
return true;
}
else
{
return false;
}
}
public byte[] getobj()
{
byte[] result = _bufferpool.getobj();
if (result != null)
{
lock (this)
{
_bufferbytecount -= result.length;
}
}
return result;
}
}
}
netserver 聚合其他类
using system;
using system.collections.generic;
using system.diagnostics;
using system.linq;
using system.net.sockets;
using system.threading;
namespace iocpcore
{
public class netserver
{
public action< socketeventparam > onsocketpacketevent;
//每个连接发送缓冲大小
public int sendbufferbyteperclient { get; set; } = 1024 * 100;
bool _serverstart = false;
list< netlistener > _listlistener = new list< netlistener >();
//负责对收到的字节流 组成完成的包
clientpacketmanage _clientpacketmanage;
public int64 sendbytecount { get; set; }
public int64 readbytecount { get; set; }
list< listenparam > _listlistenport = new list< listenparam >();
public void addlistenport(int port, object tag)
{
_listlistenport.add(new listenparam(port, tag));
}
/// < summary >
///
/// </ summary >
/// < param name = "listenfault" >监听失败的端口</ param >
/// < returns ></ returns >
public bool startlisten(out list< int > listenfault)
{
_serverstart = true;
_clientpacketmanage = new clientpacketmanage(this);
_clientpacketmanage.onsocketpacketevent += putclientpacket;
_netconnectmanage.onsocketconnectevent += socketconnectevent;
_listlistener.clear();
thread thread1 = new thread(new threadstart(netpacketprocess));
thread1.start();
thread thread2 = new thread(new threadstart(netsendprocess));
thread2.start();
thread thread3 = new thread(new threadstart(netreadprocess));
thread3.start();
listenfault = new list< int >();
foreach (listenparam param in _listlistenport)
{
netlistener listener = new netlistener(this);
listener._listenparam = param;
listener.onacceptsocket += listener_onacceptsocket;
if (!listener.startlisten())
{
listenfault.add(param._port);
}
else
{
_listlistener.add(listener);
netlogger.log(string.format("监听成功!端口:{0}", param._port));
}
}
return listenfault.count == 0;
}
public void putclientpacket(socketeventparam param)
{
onsocketpacketevent?.invoke(param);
}
//获取包的最小长度
int _packetminlen;
int _packetmaxlen;
public int packetminlen
{
get { return _packetminlen; }
}
public int packetmaxlen
{
get { return _packetmaxlen; }
}
/// < summary >
/// 设置包的最小和最大长度
/// 当minlen=0时,认为是接收字节流
/// </ summary >
/// < param name = "minlen" ></ param >
/// < param name = "maxlen" ></ param >
public void setpacketparam(int minlen, int maxlen)
{
debug.assert(minlen >= 0);
debug.assert(maxlen > minlen);
_packetminlen = minlen;
_packetmaxlen = maxlen;
}
//获取包的总长度
public delegate int delegate_getpackettotallen(byte[] data, int offset);
public delegate_getpackettotallen getpackettotallen_callback;
objectpoolwithevent< socketeventparam > _socketeventpool = new objectpoolwithevent< socketeventparam >();
private void netpacketprocess()
{
while (_serverstart)
{
try
{
dealeventpool();
}
catch (exception ex)
{
netlogger.log(string.format("dealeventpool 异常 {0}***{1}", ex.message, ex.stacktrace));
}
_socketeventpool.waitone(1000);
}
}
dictionary< socket , asyncsocketclient> _clientgroup = new dictionary< socket , asyncsocketclient>();
public int clientcount
{
get
{
lock (_clientgroup)
{
return _clientgroup.count;
}
}
}
public list< socket > clientlist
{
get
{
lock (_clientgroup)
{
return _clientgroup.keys.tolist();
}
}
}
private void dealeventpool()
{
while (true)
{
socketeventparam param = _socketeventpool.getobj();
if (param == null)
return;
if (param.socketevent == en_socketevent.close)
{
lock (_clientgroup)
{
_clientgroup.remove(param.socket);
}
}
if (_packetminlen == 0)//字节流处理
{
onsocketpacketevent?.invoke(param);
}
else
{
//组成一个完整的包 逻辑
_clientpacketmanage.putsocketparam(param);
}
}
}
private void socketconnectevent(socketeventparam param, asyncsocketclient client)
{
try
{
if (param.socket == null || client == null) //连接失败
{
}
else
{
lock (_clientgroup)
{
bool remove = _clientgroup.remove(client.connectsocket);
debug.assert(!remove);
_clientgroup.add(client.connectsocket, client);
}
client.onsocketclose += client_onsocketclose;
client.onreaddata += client_onreaddata;
client.onsenddata += client_onsenddata;
_listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));
}
_socketeventpool.putobj(param);
}
catch (exception ex)
{
netlogger.log(string.format("socketconnectevent 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
internal void onrcvpacketlenerror(socket socket, byte[] buffer, int offset, int packetlen)
{
try
{
lock (_clientgroup)
{
if (!_clientgroup.containskey(socket))
{
debug.assert(false);
return;
}
netlogger.log(string.format("报长度异常!包长:{0}", packetlen));
asyncsocketclient client = _clientgroup[socket];
client.closesocket();
}
}
catch (exception ex)
{
netlogger.log(string.format("onrcvpacketlenerror 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
#region listen port
private void listener_onacceptsocket(listenparam listenpatam, asyncsocketclient client)
{
try
{
lock (_clientgroup)
{
bool remove = _clientgroup.remove(client.connectsocket);
debug.assert(!remove);
_clientgroup.add(client.connectsocket, client);
}
client.onsocketclose += client_onsocketclose;
client.onreaddata += client_onreaddata;
client.onsenddata += client_onsenddata;
_listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));
socketeventparam param = new socketeventparam(en_socketevent.accept, client.connectsocket);
param.clientinfo = client.clientinfo;
_socketeventpool.putobj(param);
}
catch (exception ex)
{
netlogger.log(string.format("listener_onacceptsocket 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
objectpoolwithevent< socketeventdeal > _listsendevent = new objectpoolwithevent< socketeventdeal >();
private void netsendprocess()
{
while (true)
{
dealsendevent();
_listsendevent.waitone(1000);
}
}
objectpoolwithevent< socketeventdeal > _listreadevent = new objectpoolwithevent< socketeventdeal >();
private void netreadprocess()
{
while (true)
{
dealreadevent();
_listreadevent.waitone(1000);
}
}
private void dealsendevent()
{
while (true)
{
socketeventdeal item = _listsendevent.getobj();
if (item == null)
break;
switch (item.socketevent)
{
case en_socketdealevent.send:
{
while (true)
{
en_socketsendresult result = item.client.sendnextdata();
if (result == en_socketsendresult.havesend)
continue;
else
break;
}
}
break;
case en_socketdealevent.read:
{
debug.assert(false);
}
break;
}
}
}
private void dealreadevent()
{
while (true)
{
socketeventdeal item = _listreadevent.getobj();
if (item == null)
break;
switch (item.socketevent)
{
case en_socketdealevent.read:
{
while (true)
{
en_socketreadresult result = item.client.readnextdata();
if (result == en_socketreadresult.haveread)
continue;
else
break;
}
}
break;
case en_socketdealevent.send:
{
debug.assert(false);
}
break;
}
}
}
private void client_onreaddata(asyncsocketclient client, byte[] readdata)
{
//读下一条
_listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));
try
{
socketeventparam param = new socketeventparam(en_socketevent.read, client.connectsocket);
param.clientinfo = client.clientinfo;
param.data = readdata;
_socketeventpool.putobj(param);
lock (this)
{
readbytecount += readdata.length;
}
}
catch (exception ex)
{
netlogger.log(string.format("client_onreaddata 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
#endregion
private void client_onsenddata(asyncsocketclient client, int sendcount)
{
//发送下一条
_listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));
lock (this)
{
sendbytecount += sendcount;
}
}
private void client_onsocketclose(asyncsocketclient client)
{
try
{
socketeventparam param = new socketeventparam(en_socketevent.close, client.connectsocket);
param.clientinfo = client.clientinfo;
_socketeventpool.putobj(param);
}
catch (exception ex)
{
netlogger.log(string.format("client_onsocketclose 异常 {0}***{1}", ex.message, ex.stacktrace));
}
}
/// < summary >
/// 放到发送缓冲
/// </ summary >
/// < param name = "socket" ></ param >
/// < param name = "data" ></ param >
/// < returns ></ returns >
public en_senddataresult senddata(socket socket, byte[] data)
{
if (socket == null)
return en_senddataresult.no_client;
lock (_clientgroup)
{
if (!_clientgroup.containskey(socket))
return en_senddataresult.no_client;
asyncsocketclient client = _clientgroup[socket];
en_senddataresult result = client.putsenddata(data);
if (result == en_senddataresult.ok)
{
//发送下一条
_listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));
}
return result;
}
}
/// < summary >
/// 设置某个连接的发送缓冲大小
/// </ summary >
/// < param name = "socket" ></ param >
/// < param name = "bytecount" ></ param >
/// < returns ></ returns >
public bool setclientsendbuffer(socket socket, int bytecount)
{
lock (_clientgroup)
{
if (!_clientgroup.containskey(socket))
return false;
asyncsocketclient client = _clientgroup[socket];
client.sendbufferbytecount = bytecount;
return true;
}
}
#region connect process
netconnectmanage _netconnectmanage = new netconnectmanage();
/// < summary >
/// 异步连接一个客户端
/// </ summary >
/// < param name = "peerip" ></ param >
/// < param name = "peerport" ></ param >
/// < param name = "tag" ></ param >
/// < returns ></ returns >
public bool connectasyn(string peerip, int peerport, object tag)
{
return _netconnectmanage.connectasyn(peerip, peerport, tag);
}
/// < summary >
/// 同步连接一个客户端
/// </ summary >
/// < param name = "peerip" ></ param >
/// < param name = "peerport" ></ param >
/// < param name = "tag" ></ param >
/// < param name = "socket" ></ param >
/// < returns ></ returns >
public bool connect(string peerip, int peerport, object tag, out socket socket)
{
return _netconnectmanage.connect(peerip, peerport, tag, out socket);
}
#endregion
}
enum en_socketdealevent
{
read,
send,
}
class socketeventdeal
{
public asyncsocketclient client { get; set; }
public en_socketdealevent socketevent { get; set; }
public socketeventdeal(asyncsocketclient client, en_socketdealevent socketevent)
{
client = client;
socketevent = socketevent;
}
}
}
库的使用
使用起来非常简单,示例如下
using iocpcore;
using system;
using system.collections.generic;
using system.linq;
using system.net.sockets;
using system.text;
using system.threading.tasks;
using system.windows;
namespace warningclient
{
public class socketserver
{
public action< socketeventparam > onsocketevent;
public int64 sendbytecount
{
get
{
if (_netserver == null)
return 0;
return _netserver.sendbytecount;
}
}
public int64 readbytecount
{
get
{
if (_netserver == null)
return 0;
return _netserver.readbytecount;
}
}
netserver _netserver;
en_packettype _packettype = en_packettype.bytestream;
public void setpackttype(en_packettype packettype)
{
_packettype = packettype;
if (_netserver == null)
return;
if (packettype == en_packettype.bytestream)
{
_netserver.setpacketparam(0, 1024);
}
else
{
_netserver.setpacketparam(9, 1024);
}
}
public bool init(list< int > listenport)
{
netlogger.onlogevent += netlogger_onlogevent;
_netserver = new netserver();
setpackttype(_packettype);
_netserver.getpackettotallen_callback += getpackettotallen;
_netserver.onsocketpacketevent += socketpacketdeal;
foreach (int n in listenport)
{
_netserver.addlistenport(n, n);
}
list< int > listenfault;
bool start = _netserver.startlisten(out listenfault);
return start;
}
int getpackettotallen(byte[] data, int offset)
{
if (mainwindow._packettype == en_packettype.znss)
return getpacketznss(data, offset);
else
return getpacketanzhiyuan(data, offset);
}
int getpacketanzhiyuan(byte[] data, int offset)
{
int n = data[offset + 5] + 6;
return n;
}
int getpacketznss(byte[] data, int offset)
{
int packetlen = (int)(data[4]) + 5;
return packetlen;
}
public bool connectasyn(string peerip, int peerport, object tag)
{
return _netserver.connectasyn(peerip, peerport, tag);
}
public bool connect(string peerip, int peerport, object tag, out socket socket)
{
return _netserver.connect(peerip, peerport, tag, out socket);
}
private void netlogger_onlogevent(string message)
{
applog.log(message);
}
dictionary< socket , socketeventparam> _clientgroup = new dictionary< socket , socketeventparam>();
public int clientcount
{
get
{
lock (_clientgroup)
{
return _clientgroup.count;
}
}
}
public list< socket > clientlist
{
get
{
if (_netserver != null)
return _netserver.clientlist;
return new list< socket >();
}
}
void addclient(socketeventparam socketparam)
{
lock (_clientgroup)
{
_clientgroup.remove(socketparam.socket);
_clientgroup.add(socketparam.socket, socketparam);
}
}
void removeclient(socketeventparam socketparam)
{
lock (_clientgroup)
{
_clientgroup.remove(socketparam.socket);
}
}
objectpool< socketeventparam > _readdatapool = new objectpool< socketeventparam >();
public objectpool< socketeventparam > readdatapool
{
get
{
return _readdatapool;
}
}
private void socketpacketdeal(socketeventparam socketparam)
{
onsocketevent?.invoke(socketparam);
if (socketparam.socketevent == en_socketevent.read)
{
if (mainwindow._isshowreadpacket)
_readdatapool.putobj(socketparam);
}
else if (socketparam.socketevent == en_socketevent.accept)
{
addclient(socketparam);
string peerip = socketparam.clientinfo.peeripport;
applog.log(string.format("客户端链接!本地端口:{0},对端:{1}",
socketparam.clientinfo.localport, peerip));
}
else if (socketparam.socketevent == en_socketevent.connect)
{
string peerip = socketparam.clientinfo.peeripport;
if (socketparam.socket != null)
{
addclient(socketparam);
applog.log(string.format("连接对端成功!本地端口:{0},对端:{1}",
socketparam.clientinfo.localport, peerip));
}
else
{
applog.log(string.format("连接对端失败!本地端口:{0},对端:{1}",
socketparam.clientinfo.localport, peerip));
}
}
else if (socketparam.socketevent == en_socketevent.close)
{
mainwindow.mainwnd.onsocketdisconnect(socketparam.socket);
removeclient(socketparam);
string peerip = socketparam.clientinfo.peeripport;
applog.log(string.format("客户端断开!本地端口:{0},对端:{1},",
socketparam.clientinfo.localport, peerip));
}
}
public en_senddataresult senddata(socket socket, byte[] data)
{
if(socket == null)
{
messagebox.show("还没连接!");
return en_senddataresult.no_client;
}
return _netserver.senddata(socket, data);
}
internal void sendtoall(byte[] data)
{
lock (_clientgroup)
{
foreach (socket socket in _clientgroup.keys)
{
senddata(socket, data);
}
}
}
}
}
以上这篇c#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:http://HdhCmsTestcnblogs测试数据/yuanchenhui/archive/2017/11/28/asyn_scoket.html
dy("nrwz");
查看更多关于C#中一个高性能异步socket封装库的实现思路分享的详细内容...