C#中一个高性能异步socket封装库的实现思路分享
前言
socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。
异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。
我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!
异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。
我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。
纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。
在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。
为了使大家对通讯效率有初步了解,先看测试图。
主机配置情况
百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。
这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。
库的结构图
目标
即可作为服务端(监听)也可以作为客户端(主动连接)使用。
可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。
高可用性。将复杂的底层处理封装,对外接口非常友好。
高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。
实现思路
网络处理逻辑可以分为以下几个部分:
网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。
主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。
Socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1K的数据。
组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。
NetListener 监听
using System; using System.Net; using System.Net.Sockets; using System.Threading; namespace IocpCore { class NetListener { private Socket listenSocket; public ListenParam _listenParam { get; set; } public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket; bool start; NetServer _netServer; public NetListener(NetServer netServer) { _netServer = netServer; } public int _acceptAsyncCount = 0; public bool StartListen() { try { start = true; IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port); listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(listenPoint); listenSocket.Listen(200); Thread thread1 = new Thread(new ThreadStart(NetProcess)); thread1.Start(); StartAccept(); return true; } catch (Exception ex) { NetLogger.Log(string.Format("**监听异常!{0}", ex.Message)); return false; } } AutoResetEvent _acceptEvent = new AutoResetEvent(false); private void NetProcess() { while (start) { DealNewAccept(); _acceptEvent.WaitOne(1000 * 10); } } private void DealNewAccept() { try { if(_acceptAsyncCount <= 10) { StartAccept(); } while (true) { AsyncSocketClient client = _newSocketClientList.GetObj(); if (client == null) break; DealNewAccept(client); } } catch (Exception ex) { NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } private void DealNewAccept(AsyncSocketClient client) { client.SendBufferByteCount = _netServer.SendBufferBytePerClient; OnAcceptSocket?.Invoke(_listenParam, client); } private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs) { try { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } catch (Exception ex) { NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace)); } } public bool StartAccept() { SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs(); acceptEventArgs.Completed += AcceptEventArg_Completed; bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs); Interlocked.Increment(ref _acceptAsyncCount); if (!willRaiseEvent) { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } return true; } ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>(); private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs) { try { using (acceptEventArgs) { if (acceptEventArgs.AcceptSocket != null) { AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket); client.CreateClientInfo(this); _newSocketClientList.PutObj(client); _acceptEvent.Set(); } } } catch (Exception ex) { NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace)); } } } }
NetConnectManage连接处理
using System; using System.Net; using System.Net.Sockets; namespace IocpCore { class NetConnectManage { public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent; public bool ConnectAsyn(string peerIp, int peerPort, object tag) { try { Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp); SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs(); socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); socketEventArgs.Completed += SocketConnect_Completed; SocketClientInfo clientInfo = new SocketClientInfo(); socketEventArgs.UserToken = clientInfo; clientInfo.PeerIp = peerIp; clientInfo.PeerPort = peerPort; clientInfo.Tag = tag; bool willRaiseEvent = socket.ConnectAsync(socketEventArgs); if (!willRaiseEvent) { ProcessConnect(socketEventArgs); socketEventArgs.Completed -= SocketConnect_Completed; socketEventArgs.Dispose(); } return true; } catch (Exception ex) { NetLogger.Log("ConnectAsyn",ex); return false; } } private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs) { ProcessConnect(socketEventArgs); socketEventArgs.Completed -= SocketConnect_Completed; socketEventArgs.Dispose(); } private void ProcessConnect(SocketAsyncEventArgs socketEventArgs) { SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo; if (socketEventArgs.SocketError == SocketError.Success) { DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo); } else { SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null); socketParam.ClientInfo = clientInfo; OnSocketConnectEvent?.Invoke(socketParam, null); } } void DealConnectSocket(Socket socket, SocketClientInfo clientInfo) { clientInfo.SetClientInfo(socket); AsyncSocketClient client = new AsyncSocketClient(socket); client.SetClientInfo(clientInfo); //触发事件 SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket); socketParam.ClientInfo = clientInfo; OnSocketConnectEvent?.Invoke(socketParam, client); } public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { socket = null; try { Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp); SocketClientInfo clientInfo = new SocketClientInfo(); clientInfo.PeerIp = peerIp; clientInfo.PeerPort = peerPort; clientInfo.Tag = tag; EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); socketTmp.Connect(remoteEP); if (!socketTmp.Connected) return false; DealConnectSocket(socketTmp, clientInfo); socket = socketTmp; return true; } catch (Exception ex) { NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex); return false; } } } }
AsyncSocketClient socket收发处理
using System; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; namespace IocpCore { public class AsyncSocketClient { public static int IocpReadLen = 1024; public readonly Socket ConnectSocket; protected SocketAsyncEventArgs m_receiveEventArgs; public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } } protected byte[] m_asyncReceiveBuffer; protected SocketAsyncEventArgs m_sendEventArgs; public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } } protected byte[] m_asyncSendBuffer; public event Action<AsyncSocketClient, byte[]> OnReadData; public event Action<AsyncSocketClient, int> OnSendData; public event Action<AsyncSocketClient> OnSocketClose; static object releaseLock = new object(); public static int createCount = 0; public static int releaseCount = 0; ~AsyncSocketClient() { lock (releaseLock) { releaseCount++; } } public AsyncSocketClient(Socket socket) { lock (releaseLock) { createCount++; } ConnectSocket = socket; m_receiveEventArgs = new SocketAsyncEventArgs(); m_asyncReceiveBuffer = new byte[IocpReadLen]; m_receiveEventArgs.AcceptSocket = ConnectSocket; m_receiveEventArgs.Completed += ReceiveEventArgs_Completed; m_sendEventArgs = new SocketAsyncEventArgs(); m_asyncSendBuffer = new byte[IocpReadLen * 2]; m_sendEventArgs.AcceptSocket = ConnectSocket; m_sendEventArgs.Completed += SendEventArgs_Completed; } SocketClientInfo _clientInfo; public SocketClientInfo ClientInfo { get { return _clientInfo; } } internal void CreateClientInfo(NetListener netListener) { _clientInfo = new SocketClientInfo(); try { _clientInfo.Tag = netListener._listenParam._tag; IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint; Debug.Assert(netListener._listenParam._port == ip.Port); _clientInfo.LocalIp = ip.Address.ToString(); _clientInfo.LocalPort = netListener._listenParam._port; ip = ConnectSocket.RemoteEndPoint as IPEndPoint; _clientInfo.PeerIp = ip.Address.ToString(); _clientInfo.PeerPort = ip.Port; } catch (Exception ex) { NetLogger.Log("CreateClientInfo", ex); } } internal void SetClientInfo(SocketClientInfo clientInfo) { _clientInfo = clientInfo; } #region read process bool _inReadPending = false; public EN_SocketReadResult ReadNextData() { lock (this) { if (_socketError) return EN_SocketReadResult.ReadError; if (_inReadPending) return EN_SocketReadResult.InAsyn; if(!ConnectSocket.Connected) { OnReadError(); return EN_SocketReadResult.ReadError; } try { m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length); _inReadPending = true; bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求 if (!willRaiseEvent) { _inReadPending = false; ProcessReceive(); if (_socketError) { OnReadError(); return EN_SocketReadResult.ReadError; } return EN_SocketReadResult.HaveRead; } else { return EN_SocketReadResult.InAsyn; } } catch (Exception ex) { NetLogger.Log("ReadNextData", ex); _inReadPending = false; OnReadError(); return EN_SocketReadResult.ReadError; } } } private void ProcessReceive() { if (ReceiveEventArgs.BytesTransferred > 0 && ReceiveEventArgs.SocketError == SocketError.Success) { int offset = ReceiveEventArgs.Offset; int count = ReceiveEventArgs.BytesTransferred; byte[] readData = new byte[count]; Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count); _inReadPending = false; if (!_socketError) OnReadData?.Invoke(this, readData); } else { _inReadPending = false; OnReadError(); } } private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e) { lock (this) { _inReadPending = false; ProcessReceive(); if (_socketError) { OnReadError(); } } } bool _socketError = false; private void OnReadError() { lock (this) { if (_socketError == false) { _socketError = true; OnSocketClose?.Invoke(this); } CloseClient(); } } #endregion #region send process int _sendBufferByteCount = 102400; public int SendBufferByteCount { get { return _sendBufferByteCount; } set { if (value < 1024) { _sendBufferByteCount = 1024; } else { _sendBufferByteCount = value; } } } SendBufferPool _sendDataPool = new SendBufferPool(); internal EN_SendDataResult PutSendData(byte[] data) { if (_socketError) return EN_SendDataResult.no_client; if (_sendDataPool._bufferByteCount >= _sendBufferByteCount) { return EN_SendDataResult.buffer_overflow; } if (data.Length <= IocpReadLen) { _sendDataPool.PutObj(data); } else { List<byte[]> dataItems = SplitData(data, IocpReadLen); foreach (byte[] item in dataItems) { _sendDataPool.PutObj(item); } } return EN_SendDataResult.ok; } bool _inSendPending = false; public EN_SocketSendResult SendNextData() { lock (this) { if (_socketError) { return EN_SocketSendResult.SendError; } if (_inSendPending) { return EN_SocketSendResult.InAsyn; } int sendByteCount = GetSendData(); if (sendByteCount == 0) { return EN_SocketSendResult.NoSendData; } //防止抛出异常,否则影响性能 if (!ConnectSocket.Connected) { OnSendError(); return EN_SocketSendResult.SendError; } try { m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount); _inSendPending = true; bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs); if (!willRaiseEvent) { _inSendPending = false; ProcessSend(m_sendEventArgs); if (_socketError) { OnSendError(); return EN_SocketSendResult.SendError; } else { OnSendData?.Invoke(this, sendByteCount); //继续发下一条 return EN_SocketSendResult.HaveSend; } } else { return EN_SocketSendResult.InAsyn; } } catch (Exception ex) { NetLogger.Log("SendNextData", ex); _inSendPending = false; OnSendError(); return EN_SocketSendResult.SendError; } } } private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs) { lock (this) { try { _inSendPending = false; ProcessSend(m_sendEventArgs); int sendCount = 0; if (sendEventArgs.SocketError == SocketError.Success) { sendCount = sendEventArgs.BytesTransferred; } OnSendData?.Invoke(this, sendCount); if (_socketError) { OnSendError(); } } catch (Exception ex) { NetLogger.Log("SendEventArgs_Completed", ex); } } } private bool ProcessSend(SocketAsyncEventArgs sendEventArgs) { if (sendEventArgs.SocketError == SocketError.Success) { return true; } else { OnSendError(); return false; } } private int GetSendData() { int dataLen = 0; while (true) { byte[] data = _sendDataPool.GetObj(); if (data == null) return dataLen; Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length); dataLen += data.Length; if (dataLen > IocpReadLen) break; } return dataLen; } private void OnSendError() { lock (this) { if (_socketError == false) { _socketError = true; OnSocketClose?.Invoke(this); } CloseClient(); } } #endregion internal void CloseSocket() { try { ConnectSocket.Close(); } catch (Exception ex) { NetLogger.Log("CloseSocket", ex); } } static object socketCloseLock = new object(); public static int closeSendCount = 0; public static int closeReadCount = 0; bool _disposeSend = false; void CloseSend() { if (!_disposeSend && !_inSendPending) { lock (socketCloseLock) closeSendCount++; _disposeSend = true; m_sendEventArgs.SetBuffer(null, 0, 0); m_sendEventArgs.Completed -= SendEventArgs_Completed; m_sendEventArgs.Dispose(); } } bool _disposeRead = false; void CloseRead() { if (!_disposeRead && !_inReadPending) { lock (socketCloseLock) closeReadCount++; _disposeRead = true; m_receiveEventArgs.SetBuffer(null, 0, 0); m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed; m_receiveEventArgs.Dispose(); } } private void CloseClient() { try { CloseSend(); CloseRead(); ConnectSocket.Close(); } catch (Exception ex) { NetLogger.Log("CloseClient", ex); } } //发送缓冲大小 private List<byte[]> SplitData(byte[] data, int maxLen) { List<byte[]> items = new List<byte[]>(); int start = 0; while (true) { int itemLen = Math.Min(maxLen, data.Length - start); if (itemLen == 0) break; byte[] item = new byte[itemLen]; Array.Copy(data, start, item, 0, itemLen); items.Add(item); start += itemLen; } return items; } } public enum EN_SocketReadResult { InAsyn, HaveRead, ReadError } public enum EN_SocketSendResult { InAsyn, HaveSend, NoSendData, SendError } class SendBufferPool { ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>(); public Int64 _bufferByteCount = 0; public bool PutObj(byte[] obj) { if (_bufferPool.PutObj(obj)) { lock (this) { _bufferByteCount += obj.Length; } return true; } else { return false; } } public byte[] GetObj() { byte[] result = _bufferPool.GetObj(); if (result != null) { lock (this) { _bufferByteCount -= result.Length; } } return result; } } }
NetServer 聚合其他类
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net.Sockets; using System.Threading; namespace IocpCore { public class NetServer { public Action<SocketEventParam> OnSocketPacketEvent; //每个连接发送缓冲大小 public int SendBufferBytePerClient { get; set; } = 1024 * 100; bool _serverStart = false; List<NetListener> _listListener = new List<NetListener>(); //负责对收到的字节流 组成完成的包 ClientPacketManage _clientPacketManage; public Int64 SendByteCount { get; set; } public Int64 ReadByteCount { get; set; } List<ListenParam> _listListenPort = new List<ListenParam>(); public void AddListenPort(int port, object tag) { _listListenPort.Add(new ListenParam(port, tag)); } /// <summary> /// /// </summary> /// <param name="listenFault">监听失败的端口</param> /// <returns></returns> public bool StartListen(out List<int> listenFault) { _serverStart = true; _clientPacketManage = new ClientPacketManage(this); _clientPacketManage.OnSocketPacketEvent += PutClientPacket; _netConnectManage.OnSocketConnectEvent += SocketConnectEvent; _listListener.Clear(); Thread thread1 = new Thread(new ThreadStart(NetPacketProcess)); thread1.Start(); Thread thread2 = new Thread(new ThreadStart(NetSendProcess)); thread2.Start(); Thread thread3 = new Thread(new ThreadStart(NetReadProcess)); thread3.Start(); listenFault = new List<int>(); foreach (ListenParam param in _listListenPort) { NetListener listener = new NetListener(this); listener._listenParam = param; listener.OnAcceptSocket += Listener_OnAcceptSocket; if (!listener.StartListen()) { listenFault.Add(param._port); } else { _listListener.Add(listener); NetLogger.Log(string.Format("监听成功!端口:{0}", param._port)); } } return listenFault.Count == 0; } public void PutClientPacket(SocketEventParam param) { OnSocketPacketEvent?.Invoke(param); } //获取包的最小长度 int _packetMinLen; int _packetMaxLen; public int PacketMinLen { get { return _packetMinLen; } } public int PacketMaxLen { get { return _packetMaxLen; } } /// <summary> /// 设置包的最小和最大长度 /// 当minLen=0时,认为是接收字节流 /// </summary> /// <param name="minLen"></param> /// <param name="maxLen"></param> public void SetPacketParam(int minLen, int maxLen) { Debug.Assert(minLen >= 0); Debug.Assert(maxLen > minLen); _packetMinLen = minLen; _packetMaxLen = maxLen; } //获取包的总长度 public delegate int delegate_GetPacketTotalLen(byte[] data, int offset); public delegate_GetPacketTotalLen GetPacketTotalLen_Callback; ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>(); private void NetPacketProcess() { while (_serverStart) { try { DealEventPool(); } catch (Exception ex) { NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace)); } _socketEventPool.WaitOne(1000); } } Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>(); public int ClientCount { get { lock (_clientGroup) { return _clientGroup.Count; } } } public List<Socket> ClientList { get { lock (_clientGroup) { return _clientGroup.Keys.ToList(); } } } private void DealEventPool() { while (true) { SocketEventParam param = _socketEventPool.GetObj(); if (param == null) return; if (param.SocketEvent == EN_SocketEvent.close) { lock (_clientGroup) { _clientGroup.Remove(param.Socket); } } if (_packetMinLen == 0)//字节流处理 { OnSocketPacketEvent?.Invoke(param); } else { //组成一个完整的包 逻辑 _clientPacketManage.PutSocketParam(param); } } } private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client) { try { if (param.Socket == null || client == null) //连接失败 { } else { lock (_clientGroup) { bool remove = _clientGroup.Remove(client.ConnectSocket); Debug.Assert(!remove); _clientGroup.Add(client.ConnectSocket, client); } client.OnSocketClose += Client_OnSocketClose; client.OnReadData += Client_OnReadData; client.OnSendData += Client_OnSendData; _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); } _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen) { try { lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) { Debug.Assert(false); return; } NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen)); AsyncSocketClient client = _clientGroup[socket]; client.CloseSocket(); } } catch (Exception ex) { NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } #region listen port private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client) { try { lock (_clientGroup) { bool remove = _clientGroup.Remove(client.ConnectSocket); Debug.Assert(!remove); _clientGroup.Add(client.ConnectSocket, client); } client.OnSocketClose += Client_OnSocketClose; client.OnReadData += Client_OnReadData; client.OnSendData += Client_OnSendData; _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket); param.ClientInfo = client.ClientInfo; _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>(); private void NetSendProcess() { while (true) { DealSendEvent(); _listSendEvent.WaitOne(1000); } } ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>(); private void NetReadProcess() { while (true) { DealReadEvent(); _listReadEvent.WaitOne(1000); } } private void DealSendEvent() { while (true) { SocketEventDeal item = _listSendEvent.GetObj(); if (item == null) break; switch (item.SocketEvent) { case EN_SocketDealEvent.send: { while (true) { EN_SocketSendResult result = item.Client.SendNextData(); if (result == EN_SocketSendResult.HaveSend) continue; else break; } } break; case EN_SocketDealEvent.read: { Debug.Assert(false); } break; } } } private void DealReadEvent() { while (true) { SocketEventDeal item = _listReadEvent.GetObj(); if (item == null) break; switch (item.SocketEvent) { case EN_SocketDealEvent.read: { while (true) { EN_SocketReadResult result = item.Client.ReadNextData(); if (result == EN_SocketReadResult.HaveRead) continue; else break; } } break; case EN_SocketDealEvent.send: { Debug.Assert(false); } break; } } } private void Client_OnReadData(AsyncSocketClient client, byte[] readData) { //读下一条 _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); try { SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket); param.ClientInfo = client.ClientInfo; param.Data = readData; _socketEventPool.PutObj(param); lock (this) { ReadByteCount += readData.Length; } } catch (Exception ex) { NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } #endregion private void Client_OnSendData(AsyncSocketClient client, int sendCount) { //发送下一条 _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send)); lock (this) { SendByteCount += sendCount; } } private void Client_OnSocketClose(AsyncSocketClient client) { try { SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket); param.ClientInfo = client.ClientInfo; _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace)); } } /// <summary> /// 放到发送缓冲 /// </summary> /// <param name="socket"></param> /// <param name="data"></param> /// <returns></returns> public EN_SendDataResult SendData(Socket socket, byte[] data) { if (socket == null) return EN_SendDataResult.no_client; lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) return EN_SendDataResult.no_client; AsyncSocketClient client = _clientGroup[socket]; EN_SendDataResult result = client.PutSendData(data); if (result == EN_SendDataResult.ok) { //发送下一条 _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send)); } return result; } } /// <summary> /// 设置某个连接的发送缓冲大小 /// </summary> /// <param name="socket"></param> /// <param name="byteCount"></param> /// <returns></returns> public bool SetClientSendBuffer(Socket socket, int byteCount) { lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) return false; AsyncSocketClient client = _clientGroup[socket]; client.SendBufferByteCount = byteCount; return true; } } #region connect process NetConnectManage _netConnectManage = new NetConnectManage(); /// <summary> /// 异步连接一个客户端 /// </summary> /// <param name="peerIp"></param> /// <param name="peerPort"></param> /// <param name="tag"></param> /// <returns></returns> public bool ConnectAsyn(string peerIp, int peerPort, object tag) { return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag); } /// <summary> /// 同步连接一个客户端 /// </summary> /// <param name="peerIp"></param> /// <param name="peerPort"></param> /// <param name="tag"></param> /// <param name="socket"></param> /// <returns></returns> public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { return _netConnectManage.Connect(peerIp, peerPort, tag, out socket); } #endregion } enum EN_SocketDealEvent { read, send, } class SocketEventDeal { public AsyncSocketClient Client { get; set; } public EN_SocketDealEvent SocketEvent { get; set; } public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent) { Client = client; SocketEvent = socketEvent; } } }
库的使用
使用起来非常简单,示例如下
using IocpCore; using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; using System.Windows; namespace WarningClient { public class SocketServer { public Action<SocketEventParam> OnSocketEvent; public Int64 SendByteCount { get { if (_netServer == null) return 0; return _netServer.SendByteCount; } } public Int64 ReadByteCount { get { if (_netServer == null) return 0; return _netServer.ReadByteCount; } } NetServer _netServer; EN_PacketType _packetType = EN_PacketType.byteStream; public void SetPacktType(EN_PacketType packetType) { _packetType = packetType; if (_netServer == null) return; if (packetType == EN_PacketType.byteStream) { _netServer.SetPacketParam(0, 1024); } else { _netServer.SetPacketParam(9, 1024); } } public bool Init(List<int> listenPort) { NetLogger.OnLogEvent += NetLogger_OnLogEvent; _netServer = new NetServer(); SetPacktType(_packetType); _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen; _netServer.OnSocketPacketEvent += SocketPacketDeal; foreach (int n in listenPort) { _netServer.AddListenPort(n, n); } List<int> listenFault; bool start = _netServer.StartListen(out listenFault); return start; } int GetPacketTotalLen(byte[] data, int offset) { if (MainWindow._packetType == EN_PacketType.znss) return GetPacketZnss(data, offset); else return GetPacketAnzhiyuan(data, offset); } int GetPacketAnzhiyuan(byte[] data, int offset) { int n = data[offset + 5] + 6; return n; } int GetPacketZnss(byte[] data, int offset) { int packetLen = (int)(data[4]) + 5; return packetLen; } public bool ConnectAsyn(string peerIp, int peerPort, object tag) { return _netServer.ConnectAsyn(peerIp, peerPort, tag); } public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { return _netServer.Connect(peerIp, peerPort, tag, out socket); } private void NetLogger_OnLogEvent(string message) { AppLog.Log(message); } Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>(); public int ClientCount { get { lock (_clientGroup) { return _clientGroup.Count; } } } public List<Socket> ClientList { get { if (_netServer != null) return _netServer.ClientList; return new List<Socket>(); } } void AddClient(SocketEventParam socketParam) { lock (_clientGroup) { _clientGroup.Remove(socketParam.Socket); _clientGroup.Add(socketParam.Socket, socketParam); } } void RemoveClient(SocketEventParam socketParam) { lock (_clientGroup) { _clientGroup.Remove(socketParam.Socket); } } ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>(); public ObjectPool<SocketEventParam> ReadDataPool { get { return _readDataPool; } } private void SocketPacketDeal(SocketEventParam socketParam) { OnSocketEvent?.Invoke(socketParam); if (socketParam.SocketEvent == EN_SocketEvent.read) { if (MainWindow._isShowReadPacket) _readDataPool.PutObj(socketParam); } else if (socketParam.SocketEvent == EN_SocketEvent.accept) { AddClient(socketParam); string peerIp = socketParam.ClientInfo.PeerIpPort; AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } else if (socketParam.SocketEvent == EN_SocketEvent.connect) { string peerIp = socketParam.ClientInfo.PeerIpPort; if (socketParam.Socket != null) { AddClient(socketParam); AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } else { AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } } else if (socketParam.SocketEvent == EN_SocketEvent.close) { MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket); RemoveClient(socketParam); string peerIp = socketParam.ClientInfo.PeerIpPort; AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},", socketParam.ClientInfo.LocalPort, peerIp)); } } public EN_SendDataResult SendData(Socket socket, byte[] data) { if(socket == null) { MessageBox.Show("还没连接!"); return EN_SendDataResult.no_client; } return _netServer.SendData(socket, data); } internal void SendToAll(byte[] data) { lock (_clientGroup) { foreach (Socket socket in _clientGroup.Keys) { SendData(socket, data); } } } } }
上一篇:C#实现经典飞行棋游戏的示例代码
栏 目:.NET代码
下一篇:C# Winfrom实现Skyline画直线功能的示例代码
本文标题:C#中一个高性能异步socket封装库的实现思路分享
本文地址:http://www.codeinn.net/misctech/212149.html