协议扩展
前两章节对Proto的基本使用做了大概介绍,这一节来解决下Protobuf在运用过程中的主要问题——协议的映射。当客户端的数据传输到服务端后,服务端如何知道用什么协议来反序列化数据流呢。
首先我们来看用proto文件生成的ProtoC#脚本。对应可以发现,Proto文件中的一个message结构,转换成C#脚本就是一个Class。接下来敲重点,该Class使用了partial
特性,该特性可以让一个完整类分成多个部分写在不同的文件里,最终C#编译时合并成一个完整类。
有这一特性我们就可以做很多事情了,我们可以对Proto生成的C#脚本进行扩展。
接下来的操作大概是这样:
根据Proto中定义的协议,编写协议对应的Class(这一步可以写程序来代替)。编写的Class增加特性,在特性中给该类指定唯一ID。在游戏框架启动时检索程序集中的所有该特性的Type,将Type和对应的唯一ID保存到字典中。传输消息时传输(协议ID+协议)的组合,那么服务端收到消息后就可以通过协议ID在字典中检索到协议类型,然后使用协议类型反序列化成协议。这样服务端就收到数据了。
Socket选择
现在Socket的使用方式分多种:同步Socket、异步Socket、多路复用Select、SocketAsyncEventArgs
同步Socket
当主线程在发送消息时会被阻塞,例如服务端发送一个登录请求给服务器,服务器处理完再下发给客户端,这一段时间的客户端主线程都是被阻塞的,啥事都干不了。所以同步Socket基本上都是一个演示,在实际应用中,我们都不会使用同步Socket。
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Text;
using UnityEngine;
/// 用于从远程设备接收数据的状态对象。
// State object for receiving data from remote device.
public class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 256;
//接收数据的Buffer
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// 接收的数据字符串
// Received data string.
public StringBuilder sb = new StringBuilder();
}
public class CLient : MonoBehaviour
{
public void Start()
{
StartClient();
}
//远端设备端口号。
// The port number for the remote device.
private const int port = 11000;
//ManualResetEvent 线程信号
// ManualResetEvent instances signal completion.
private ManualResetEvent connectDone = new ManualResetEvent(false);
private ManualResetEvent sendDone = new ManualResetEvent(false);
private ManualResetEvent receiveDone = new ManualResetEvent(false);
//来自远程设备的答复。
// The response from the remote device.
private String response = String.Empty;
private void StartClient()
{
// 连接到远端设备。
// Connect to a remote device.
try
{
//为套接字建立远程端点。
//远端设备的名称为“host.contoso.com”。
// Establish the remote endpoint for the socket.
// The name of the remote device is "host.contoso.com".
IPHostEntry ipHostInfo = Dns.GetHostEntry("192.168.1.6");
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint remoteEP = new IPEndPoint(ipAddress, port);
//创建TCP/IP套接字。
// Create a TCP/IP socket.
Socket client = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
try
{
//连接到远程端口。
// Connect to the remote endpoint.
client.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), client);
//将线程设为阻塞
connectDone.WaitOne();
}
catch (Exception e)
{
connectDone.Set();
Log.Error(e);
return;
}
// 将测试数据发送到远端设备。
// Send test data to the remote device.
Send(client, "This is a test<EOF>");
sendDone.WaitOne();
// 接收来自远程设备的响应。
// Receive the response from the remote device.
Receive(client);
receiveDone.WaitOne();
// 将响应写入控制台。
// Write the response to the console.
Log.Debug($"Response received : {response}");
// 释放套接字。
// Release the socket.
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
/// <summary>
/// 连接回调
/// </summary>
/// <param name="ar"></param>
private void ConnectCallback(IAsyncResult ar)
{
try
{
// 从状态对象获取套接字。
// Retrieve the socket from the state object.
Socket client = (Socket)ar.AsyncState;
// 完成连接
// Complete the connection.
client.EndConnect(ar);
Log.Debug($"Socket connected to {client.RemoteEndPoint.ToString()}");
// 发出连接已建立的信号。
// Signal that the connection has been made.
connectDone.Set();
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
/// <summary>
/// 接收数据
/// </summary>
/// <param name="client"></param>
private void Receive(Socket client)
{
try
{
// 创建状态对象。
// Create the state object.
StateObject state = new StateObject();
state.workSocket = client;
// 开始从远端设备接收数据。
// Begin receiving the data from the remote device.
// byte[] 开始位置 接受的数量 枚举\默认0 接收完成回调 回调的参数
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
/// <summary>
/// 接收数据回调
/// </summary>
/// <param name="ar"></param>
private void ReceiveCallback(IAsyncResult ar)
{
try
{
// 从异步状态对象检索状态对象和客户端套接字。
// Retrieve the state object and the client socket from the asynchronous state object.
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// 从远端设备读取数据。
// Read data from the remote device.
int bytesRead = client.EndReceive(ar);
if (bytesRead > 0)
{
// 可能有更多的数据,所以存储到目前为止收到的数据。
// There might be more data, so store the data received so far.
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
// 获取剩下的数据。
// Get the rest of the data.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
}
else
{
// 所有的数据都到了;把它作为回复。
// All the data has arrived; put it in response.
if (state.sb.Length > 1)
{
response = state.sb.ToString();
}
// 已接收所有字节的信号。
// Signal that all bytes have been received.
receiveDone.Set();
}
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
private void Send(Socket client, String data)
{
// 使用ASCII编码将字符串数据转换为字节数据。
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// 开始向远端设备发送数据。
// Begin sending the data to the remote device.
client.BeginSend(byteData, 0, byteData.Length, 0, new AsyncCallback(SendCallback), client);
}
/// <summary>
/// 发送成功回调
/// </summary>
/// <param name="ar"></param>
private void SendCallback(IAsyncResult ar)
{
try
{
// 从状态对象检索套接字。
// Retrieve the socket from the state object.
Socket client = (Socket)ar.AsyncState;
// 完成数据发送到远端设备。
// Complete sending the data to the remote device.
int bytesSent = client.EndSend(ar);
Log.Debug($"Sent {bytesSent} bytes to server.");
// 所有字节已发送的信号。
// Signal that all bytes have been sent.
sendDone.Set();
}
catch (Exception e)
{
Log.Error(e.ToString());
}
}
}
异步Socket
相对于同步Socket,它是非阻塞的。 客户端在发送请求给服务器后不会被阻塞,它可以继续运行主线程,当服务器下发答复后再继续在主线程上执行。
异步Socket本身是一个多线程的Socket,内部封装了多线程处理方式。需要注意的是,在Unity中当服务端答复后,不能直接调用游戏逻辑,因为这时候的线程还是多线程,需要转换到主线程后继续执行。
多路复用Select
多路复用一般用于服务器上,是一种CPU友好的Socket,在Socket未接收到消息时会被休眠,当接收到消息后会被唤醒。多路复用也分为阻塞和非阻塞两种类型。
IO多路复用+阻塞式
仅使用一个线程就可以实现对多个描述符的状态管理,但由于IO输入输出调用本身式阻塞的,可能出现IP输入输出过慢,影响其他描述符的效率,从而出现整体性能不高。此方式编程难度较低。
IO多路复用+非阻塞式
IO采用非阻塞式,可以大大降低单个描述符的IP速度对其他IO的影响,不过此种编程难度较高,主要体现在需要考虑一些慢速读写的边界情况,比如黏包、写缓冲不够等。
using NetWork;
using System.Net.Sockets;
using Fox;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
public class NetManager
{
public static NetManager Instance
{
get; private set;
}
public NetManager()
{
Instance = this;
}
public enum NetEvent
{
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
public string PublicKey = "OceanSever";
public string SecretKey { get; private set; }
private Socket m_Socket;
public static long m_PingInterval = 10;
private ByteArray m_ReadBuff;
private string m_Ip;
private int m_Port;
private Thread m_MsgThread;
private Thread m_HeartThread;
static long lastPingTime;
static long lastPongTime;
private Queue<ByteArray> m_WriteQueue;
private List<MsgBase> m_MsgList;
private List<MsgBase> m_UnityMsgList;
//消息列表长度
private int m_MsgCount = 0;
//连接状态
private bool m_Connecting = false;
private bool m_Closing = false;
private bool m_Diaoxian = false;
//是否连接成功过,只要成功连接就永远为true;
private bool m_IsConnectSuccessed = false;
private bool m_ReConnet = false;
public delegate void EventListener(string str);
private Dictionary<NetEvent, EventListener> m_ListenerDic = new Dictionary<NetEvent, EventListener>();
public delegate void ProtoListener(MsgBase msg);
private Dictionary<ProtocolEnum, ProtoListener> m_ProtoDic = new Dictionary<ProtocolEnum, ProtoListener>();
/// <summary>
/// 监听连接事件
/// </summary>
/// <param name="netEvent"></param>
/// <param name="listener"></param>
public void AddEvnetListener(NetEvent netEvent, EventListener listener)
{
if (m_ListenerDic.ContainsKey(netEvent))
{
m_ListenerDic[netEvent] += listener;
}
else
{
m_ListenerDic[netEvent] = listener;
}
}
public void RemoveEvnetListener(NetEvent netEvent, EventListener listener)
{
if (m_ListenerDic.ContainsKey(netEvent))
{
m_ListenerDic[netEvent] -= listener;
if (m_ListenerDic[netEvent] == null)
{
m_ListenerDic.Remove(netEvent);
}
}
}
void FirstEvent(NetEvent netEvent, string str)
{
if (m_ListenerDic.ContainsKey(netEvent))
{
m_ListenerDic[netEvent](str);
}
}
/// <summary>
/// 一个协议希望只有一个监听
/// </summary>
/// <param name="protocolEnum"></param>
/// <param name="listener"></param>
public void AddProtoListener(ProtocolEnum protocolEnum, ProtoListener listener)
{
m_ProtoDic[protocolEnum] = listener;
}
public void FirstProto(ProtocolEnum protocolEnum, MsgBase msgBase)
{
if (m_ProtoDic.ContainsKey(protocolEnum))
{
m_ProtoDic[protocolEnum](msgBase);
}
}
void PingThread()
{
while (m_Socket != null && m_Socket.Connected)
{
long timeNow = GetTimeStamp();
if (timeNow - lastPingTime > m_PingInterval)
{
MsgPing msgPing = new MsgPing();
SendMessage(msgPing);
lastPingTime = GetTimeStamp();
Log.Debug("客户端发送心跳包");
}
//如果心跳包过长时间没收到,就关闭连接
if (timeNow - lastPongTime > m_PingInterval * 4)
{
Close(false);
}
}
}
/// <summary>
/// 重连方法
/// </summary>
public void ReConnect()
{
Connect(m_Ip,m_Port);
m_ReConnet = true;
}
/// <summary>
/// 连接服务器函数
/// </summary>
/// <param name="ip"></param>
/// <param name="port"></param>
public void Connect(string ip, int port)
{
if (m_Socket != null && m_Socket.Connected)
{
Log.Error("连接失败");
return;
}
if (m_Connecting)
{
Log.Error("连接失败,正在连接中!");
return;
}
InitState();
m_Socket.NoDelay = true;
m_Connecting = true;
m_Socket.BeginConnect(ip, port, ConnectCallback, m_Socket);
m_Ip = ip;
m_Port = port;
}
/// <summary>
/// 初始化变量
/// </summary>
void InitState()
{
//初始化变量
m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
m_ReadBuff = new ByteArray();
m_WriteQueue = new Queue<ByteArray>();
m_Connecting = false;
m_Closing = false;
m_MsgList = new List<MsgBase>();
m_UnityMsgList = new List<MsgBase>();
m_MsgCount = 0;
lastPingTime = GetTimeStamp();
lastPongTime = GetTimeStamp();
}
public void Update()
{
if (m_Diaoxian&&m_IsConnectSuccessed)
{
//弹窗,确定是否重连
//重新连接
ReConnect();
//退出游戏
m_Diaoxian = false;
}
MsgUpdate();
}
void MsgUpdate()
{
if (m_Socket != null && m_Socket.Connected)
{
if (m_MsgCount == 0)
{
return;
}
MsgBase msgBase = null;
lock (m_UnityMsgList)
{
if (m_UnityMsgList.Count > 0)
{
msgBase = m_UnityMsgList[0];
m_UnityMsgList.RemoveAt(0);
m_MsgCount--;
}
}
if (msgBase != null)
{
FirstProto(msgBase.ProtoType, msgBase);
}
}
}
void MsgThread()
{
while (m_Socket != null && m_Socket.Connected)
{
if (m_MsgList.Count <= 0)
continue;
MsgBase msgBase = null;
lock (m_MsgList)
{
if (m_MsgList.Count > 0)
{
msgBase = m_MsgList[0];
m_MsgList.RemoveAt(0);
}
}
if (msgBase != null)
{
if (msgBase is MsgPing)
{
lastPongTime = GetTimeStamp();
Log.Debug("收到心跳包!!");
m_MsgCount--;
}
else
{
lock (m_UnityMsgList)
{
m_UnityMsgList.Add(msgBase);
}
}
}
else
{
break;
}
}
}
/// <summary>
/// 连接回调
/// </summary>
/// <param name="ar"></param>
void ConnectCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
socket.EndConnect(ar);
FirstEvent(NetEvent.ConnectSucc, "");
m_IsConnectSuccessed = true;
m_MsgThread = new Thread(MsgThread);
m_MsgThread.IsBackground = true;
m_MsgThread.Start();
m_HeartThread = new Thread(PingThread);
m_HeartThread.IsBackground = true;
m_HeartThread.Start();
m_Connecting = false;
ProtocolMgr.SecretRequest();
Log.Debug("Socket Connet Success");
m_Socket.BeginReceive(m_ReadBuff.Bytes, m_ReadBuff.WriteIdx, m_ReadBuff.Remain, 0, ReceiveCallBack, socket);
}
catch (SocketException ex)
{
Log.Error("Socket Connect fail:" + ex.ToString());
m_Connecting = false;
}
}
/// <summary>
/// 消息接收回调
/// </summary>
/// <param name="ar"></param>
void ReceiveCallBack(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
int count = socket.EndReceive(ar);
if (count <= 0)
{
Close();
//关闭链接
return;
}
m_ReadBuff.WriteIdx += count;
OnReceiveData();
if (m_ReadBuff.Remain < 8)
{
m_ReadBuff.MoveBytes();
m_ReadBuff.ReSize(m_ReadBuff.Length * 2);
}
socket.BeginReceive(m_ReadBuff.Bytes, m_ReadBuff.WriteIdx, m_ReadBuff.Remain, 0, ReceiveCallBack, socket);
}
catch (SocketException ex)
{
Log.Error("Socket ReceiveCallBack fail:" + ex.ToString());
Close();
}
}
/// <summary>
/// 对数据进行处理
/// </summary>
void OnReceiveData()
{
if (m_ReadBuff.Length <= 4 || m_ReadBuff.ReadIdx < 0)
return;
int readIdx = m_ReadBuff.ReadIdx;
byte[] bytes = m_ReadBuff.Bytes;
int bodyLength = BitConverter.ToInt32(bytes, readIdx);
//读取协议长度之后进行判断,如果消息长度小于读出来的消息长度,证明是没有一条完整的数据
if (m_ReadBuff.Length < bodyLength + 4)
{
return;
}
m_ReadBuff.ReadIdx += 4;
int nameCount = 0;
ProtocolEnum protocol = MsgBase.DecodeName(m_ReadBuff.Bytes, m_ReadBuff.ReadIdx, out nameCount);
if (protocol == ProtocolEnum.None)
{
Log.Error("OnReceiveData MsgBase.DecodeName fail");
Close();
return;
}
m_ReadBuff.ReadIdx += nameCount;
//解析协议体
int bodyCount = bodyLength - nameCount;
try
{
MsgBase msgBase = MsgBase.Decode(protocol, m_ReadBuff.Bytes, m_ReadBuff.ReadIdx, bodyCount);
if (msgBase == null)
{
Log.Error("接受数据协议内容解析出错");
Close();
return;
}
m_ReadBuff.ReadIdx += bodyCount;
m_ReadBuff.CheckAndMoveBytes();
//协议具体的操作
lock (m_MsgList)
{
m_MsgList.Add(msgBase);
}
m_MsgCount++;
//处理粘包
if (m_ReadBuff.Length > 4)
{
OnReceiveData();
}
}
catch (Exception ex)
{
Log.Error("Socket OnReceiveData error:" + ex.ToString());
Close();
}
}
/// <summary>
/// 发送数据到服务器
/// </summary>
/// <param name="msgBase"></param>
public void SendMessage(MsgBase msgBase)
{
if (m_Socket == null || !m_Socket.Connected)
{
return;
}
if (m_Connecting)
{
Log.Error("正在链接服务器中,无法发送消息!");
return;
}
if (m_Closing)
{
Log.Error("正在关闭链接中,无法发送消息!");
return;
}
try
{
byte[] nameBytes = MsgBase.EncodeName(msgBase);
byte[] bodyBytes = MsgBase.Encond(msgBase);
int len = nameBytes.Length + bodyBytes.Length;
byte[] byteHead = BitConverter.GetBytes(len);
byte[] sendBytes = new byte[byteHead.Length + len];
Array.Copy(byteHead, 0, sendBytes, 0, byteHead.Length);
Array.Copy(nameBytes, 0, sendBytes, byteHead.Length, nameBytes.Length);
Array.Copy(bodyBytes, 0, sendBytes, byteHead.Length + nameBytes.Length, bodyBytes.Length);
ByteArray ba = new ByteArray(sendBytes);
int count = 0;
lock (m_WriteQueue)
{
m_WriteQueue.Enqueue(ba);
count = m_WriteQueue.Count;
}
if (count == 1)
{
m_Socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallBack, m_Socket);
}
}
catch (Exception ex)
{
Log.Error("SendMessage error:" + ex.ToString());
Close();
}
}
/// <summary>
/// 发送结束回调
/// </summary>
/// <param name="ar"></param>
void SendCallBack(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
if (socket == null || !socket.Connected) return;
int count = socket.EndSend(ar);
//判断是否发送完成
ByteArray ba;
lock (m_WriteQueue)
{
ba = m_WriteQueue.First();
}
ba.ReadIdx += count;
//代表发送完整
if (ba.Length == 0)
{
lock (m_WriteQueue)
{
m_WriteQueue.Dequeue();
if (m_WriteQueue.Count > 0)
{
ba = m_WriteQueue.First();
}
else
{
ba = null;
}
}
}
//发送不完整或发送完整且存在第二条数据
if (ba != null)
{
socket.BeginSend(ba.Bytes, ba.ReadIdx, ba.Length, 0, SendCallBack, socket);
}
//确保关闭链接前,先把消息发送出去
else if (m_Closing)
{
RealClose();
}
}
catch (SocketException ex)
{
Log.Error("SendCallBack error:" + ex.ToString());
Close();
}
}
/// <summary>
/// 关闭连接
/// </summary>
public void Close(bool normal = true)
{
if (m_Socket == null || m_Connecting)
{
return;
}
if (m_Connecting) return;
if (m_WriteQueue.Count > 0)
{
m_Closing = true;
}
else
{
RealClose(normal);
}
}
void RealClose(bool normal = true)
{
SecretKey = "";
m_Socket.Close();
FirstEvent(NetEvent.Close, normal.ToString());
m_Diaoxian = true;
if (m_HeartThread != null && m_HeartThread.IsAlive)
{
m_HeartThread.Abort();
m_HeartThread = null;
}
if (m_MsgThread != null && m_MsgThread.IsAlive)
{
m_MsgThread.Abort();
m_MsgThread = null;
}
Log.Debug("Close Socket");
}
public void SetKey(string key)
{
SecretKey = key;
}
public static long GetTimeStamp()
{
TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
return Convert.ToInt64(ts.TotalSeconds);
}
}
SocketAsyncEventArgs
SocketAsyncEventArgs是一种增强Socket,可供专用高性能套接字应用程序使用的替代异步Socket。这些增强功能的主要功能是避免在大容量异步套接字 I/O 期间重复分配和同步对象。
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
internal class AsyncUserToken
{
public System.Net.Sockets.Socket Socket { get; set; }
}
//这个类创建一个单独的大缓冲区,可以分割并分配给SocketAsyncEventArgs对象,用于每个socket I/O操作。
//这使缓冲区易于重用,并防止堆内存碎片化。
//在BufferManager类上公开的操作不是线程安全的。
// This class creates a single large buffer which can be divided up and assigned to SocketAsyncEventArgs objects for use with each socket I/O operation.
// This enables bufffers to be easily reused and guards against fragmenting heap memory.
// The operations exposed on the BufferManager class are not thread safe.
class BufferManager
{
//由缓冲池控制的总字节数
int m_numBytes; // the total number of bytes controlled by the buffer pool
//缓冲区管理器维护的底层字节数组
byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager
Stack<int> m_freeIndexPool; //
int m_currentIndex; //当前指针
int m_bufferSize; //byte[]可用大小
public BufferManager(int totalBytes, int bufferSize)
{
//totalBytes=最大连接数*bufferSize*2
//bufferSize=Socket最大IO容量
m_numBytes = totalBytes;
m_currentIndex = 0;
m_bufferSize = bufferSize;
m_freeIndexPool = new Stack<int>();
}
// 分配缓冲池使用的缓冲空间
// Allocates buffer space used by the buffer pool
public void InitBuffer()
{
// 创建一个大的缓冲区并将其分割
// 输出到每个SocketAsyncEventArg对象
// create one big large buffer and divide that
// out to each SocketAsyncEventArg object
m_buffer = new byte[m_numBytes];
}
// 从缓冲池中分配一个缓冲区给指定的SocketAsyncEventArgs对象
// 如果缓冲区设置成功,则返回true,否则返回false
// Assigns a buffer from the buffer pool to the specified SocketAsyncEventArgs object
// <returns>true if the buffer was successfully set, else false</returns>
public bool SetBuffer(SocketAsyncEventArgs args)
{
if (m_freeIndexPool.Count > 0)
{
//设置异步Socket的缓冲区
// byte[] byte[]中操作开始位置 byte[]可接收最大数据量
args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
}
else
{
if ((m_numBytes - m_bufferSize) < m_currentIndex)
{
return false;
}
args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
m_currentIndex += m_bufferSize;
}
return true;
}
//从SocketAsyncEventArg对象中移除缓冲区。并将缓冲区释放回缓冲池
// Removes the buffer from a SocketAsyncEventArg object.
// This frees the buffer back to the buffer pool
public void FreeBuffer(SocketAsyncEventArgs args)
{
//Offset:byte[]偏移量
m_freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
}
// 表示可重用的SocketAsyncEventArgs对象的集合。
// Represents a collection of reusable SocketAsyncEventArgs objects.
class SocketAsyncEventArgsPool
{
Stack<SocketAsyncEventArgs> m_pool;
//将对象池初始化为指定的大小
//“capacity”参数是该池可以容纳的SocketAsyncEventArgs对象的最大数量
// Initializes the object pool to the specified size
// The "capacity" parameter is the maximum number of SocketAsyncEventArgs objects the pool can hold
public SocketAsyncEventArgsPool(int capacity)
{
m_pool = new Stack<SocketAsyncEventArgs>(capacity);
}
//添加一个SocketAsyncEventArg实例到池中
//“item”参数是要添加到池中的SocketAsyncEventArgs实例
// Add a SocketAsyncEventArg instance to the pool
//The "item" parameter is the SocketAsyncEventArgs instance to add to the pool
public void Push(SocketAsyncEventArgs item)
{
if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
lock (m_pool)
{
m_pool.Push(item);
}
}
//从池中移除一个SocketAsyncEventArgs实例并返回从池中移除的对象
// Removes a SocketAsyncEventArgs instance from the pool and returns the object removed from the pool
public SocketAsyncEventArgs Pop()
{
lock (m_pool)
{
return m_pool.Pop();
}
}
// SocketAsyncEventArgs实例的数量
// The number of SocketAsyncEventArgs instances in the pool
public int Count
{
get { return m_pool.Count; }
}
}
//为socket服务器实现连接逻辑。
//接收连接后,从客户端读取的所有数据都被发送回客户端。
//继续读取并回传给客户端模式,直到客户端断开连接。
// Implements the connection logic for the socket server.
// After accepting a connection, all data read from the client is sent back to the client.
// The read and echo back to the client pattern is continued until the client disconnects.
class Server
{
//同时处理的最大连接数
private int m_numConnections; // the maximum number of connections the sample is designed to handle simultaneously
//每个socket I/O操作使用的缓冲区大小
private int m_receiveBufferSize;// buffer size to use for each socket I/O operation
//表示所有套接字操作的一组可重用的缓冲区
BufferManager m_bufferManager; // represents a large reusable set of buffers for all socket operations
//读取,写入(不为接收分配缓冲区空间)
const int opsToPreAlloc = 2; // read, write (don't alloc buffer space for accepts)
//监听传入连接请求的套接字
Socket listenSocket; // the socket used to listen for incoming connection requests
//用于写、读和接受套接字操作的可重用SocketAsyncEventArgs对象池
SocketAsyncEventArgsPool m_readWritePool; // pool of reusable SocketAsyncEventArgs objects for write, read and accept socket operations
//服务器接收到的字节总数
int m_totalBytesRead; // counter of the total # bytes received by the server
//连接到服务器的客户端总数
int m_numConnectedSockets; // the total number of clients connected to the server
//限制可同时访问某一资源或资源池的线程数
Semaphore m_maxNumberAcceptedClients;
//创建一个未初始化的服务器实例
//要启动服务器监听连接请求,先调用Init方法,再调用start方法
/// <param name="numConnections">该示例设计用于同时处理的最大连接数</param>
/// <param name="receiveBufferSize">每个套接字I/O操作使用的缓冲区大小</param>
// Create an uninitialized server instance.
// To start the server listening for connection requests call the Init method followed by Start method
// <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
// <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
public Server(int numConnections, int receiveBufferSize)
{
m_totalBytesRead = 0;
m_numConnectedSockets = 0;
m_numConnections = numConnections;
m_receiveBufferSize = receiveBufferSize;
// 分配缓冲区,使最大数量的套接字可以有一个未完成的读和写同时提交给套接字
// allocate buffers such that the maximum number of sockets can have one outstanding read and write posted to the socket simultaneously
m_bufferManager = new BufferManager(m_receiveBufferSize * numConnections * opsToPreAlloc, m_receiveBufferSize);
m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
}
//通过预分配可重用的缓冲区和上下文对象来初始化服务器。
//这些对象不需要预先分配或重用,但这样做是为了说明如何方便地使用API来创建可重用对象,以提高服务器性能。
// Initializes the server by preallocating reusable buffers and context objects.
// These objects do not need to be preallocated or reused, but it is done this way to illustrate how the API can easily be used to create reusable objects to increase server performance.
public void Init()
{
//分配一个大字节缓冲区,所有I/O操作使用其中的一块。
//防止内存碎片
// Allocates one large byte buffer which all I/O operations use a piece of.
// This gaurds against memory fragmentation
m_bufferManager.InitBuffer();
//预分配SocketAsyncEventArgs对象池
// preallocate pool of SocketAsyncEventArgs objects
SocketAsyncEventArgs readWriteEventArg;
for (int i = 0; i < m_numConnections; i++)
{
//预分配一组可重用的SocketAsyncEventArgs
//Pre-allocate a set of reusable SocketAsyncEventArgs
readWriteEventArg = new SocketAsyncEventArgs();
//设置回调方法
readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
//设置用户信息
readWriteEventArg.UserToken = new AsyncUserToken();
//设置缓冲区,从缓冲池中分配一个字节缓冲区给SocketAsyncEventArg对象
// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
m_bufferManager.SetBuffer(readWriteEventArg);
//将SocketAsyncEventArg添加到池中
// add SocketAsyncEventArg to the pool
m_readWritePool.Push(readWriteEventArg);
}
}
//启动服务器,使其监听传入的连接
//服务器将侦听连接请求的端点
// Starts the server such that it is listening for incoming connection requests.
// <param name="localEndPoint">The endpoint which the server will listening for connection requests on</param>
public void Start(IPEndPoint localEndPoint)
{
//创建监听传入连接的套接字
// create the socket which listens for incoming connections
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
//用100个连接来启动服务器
// start the server with a listen backlog of 100 connections
listenSocket.Listen(100);
//接收监听套接字
// post accepts on the listening socket
StartAccept(null);
//按任意键终止服务器进程
Console.WriteLine("Press any key to terminate the server process....");
Console.ReadKey();
}
//开始一个操作,接受来自客户端的连接请求
//在服务器监听套接字上发出accept操作时使用的上下文对象
// Begins an operation to accept a connection request from the client
// <param name="acceptEventArg">The context object to use when issuing the accept operation on the server's listening socket</param>
public void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket必须被清除,因为上下文对象正在被重用
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
m_maxNumberAcceptedClients.WaitOne();
//开始一个异步操作,接收客户端的连接请求
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
//如果 I/O 操作挂起,则为 true。如果 I/O 操作同步完成,则为 false。
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
//该方法是与Socket相关联的回调方法。
//执行AcceptAsync操作,并在接受操作完成时调用
// This method is the callback method associated with Socket.
// AcceptAsync operations and is invoked when an accept operation is complete
void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
//在线程中以原子操作的形式递增指定变量的值并存储结果。
Interlocked.Increment(ref m_numConnectedSockets);
//接收客户端连接。 有{0}客户端连接到服务器
Console.WriteLine("Client connection accepted. There are {0} clients connected to the server", m_numConnectedSockets);
//获取接受的客户端连接的套接字,并将其放入ReadEventArg对象用户令牌中
// Get the socket for the accepted client connection and put it into the ReadEventArg object user token
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
//一旦客户端连接,开启一个异步请求接收连接中的数据
// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(readEventArgs);
}
//接收下一个连接请求
// Accept the next connection request
StartAccept(e);
}
//在套接字上完成接收或发送操作时调用此方法
//与完成的接收操作相关联的SocketAsyncEventArg
// This method is called whenever a receive or send operation is completed on a socket
// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
// 确定刚刚完成的操作类型并调用相关的处理程序
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
//当异步接收操作完成时调用此方法。
//如果远程主机关闭连接,则套接字关闭。
//如果接收到数据,则数据回显给客户端。
// This method is invoked when an asynchronous receive operation completes.
// If the remote host closed the connection, then the socket is closed.
// If data was received then the data is echoed back to the client.
private void ProcessReceive(SocketAsyncEventArgs e)
{
//检查远程主机是否关闭了连接
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
//BytesTransferred:套接字操作中传输的字节数,SocketError:获取异步套接字操作的结果。
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
//增加服务器接收的总字节数
//increment the count of the total bytes receive by the server
Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
//服务器已经读取了{0}字节
Console.WriteLine("The server has read a total of {0} bytes", m_totalBytesRead);
//将接收到的数据返回给客户端。Offset:缓冲区偏移量
//echo the data received back to the client
e.SetBuffer(e.Offset, e.BytesTransferred);
bool willRaiseEvent = token.Socket.SendAsync(e);
if (!willRaiseEvent)
{
ProcessSend(e);
}
}
else
{
CloseClientSocket(e);
}
}
//当异步发送操作完成时调用此方法。
//该方法在套接字上发出另一个receive来读取从客户端发送的任何额外数据
// This method is invoked when an asynchronous send operation completes.
// The method issues another receive on the socket to read any additional data sent from the client
private void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
//将数据返回给客户端
// done echoing data back to the client
AsyncUserToken token = (AsyncUserToken)e.UserToken;
//读取客户端发送的下一个数据块
// read the next block of data send from the client
bool willRaiseEvent = token.Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
}
else
{
CloseClientSocket(e);
}
}
// 断开客户端Socket连接
private void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
//关闭与客户端关联的套接字
// close the socket associated with the client
try
{
token.Socket.Shutdown(SocketShutdown.Send);
}
//如果客户端进程已经关闭
// throws if client process has already closed
catch (Exception) { }
token.Socket.Close();
//递减计数器,跟踪连接到服务器的客户端总数
// decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref m_numConnectedSockets);
//释放SocketAsyncEventArg以便它们可以被其他客户端重用
// Free the SocketAsyncEventArg so they can be reused by another client
m_readWritePool.Push(e);
m_maxNumberAcceptedClients.Release();
//客户端与服务器断开连接。 有{0}客户端连接到服务器
Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
}
}
数据包规则
新的问题,如何保证读取到数据不会出现黏包与分包。
如何解决这一问题,我们需要指定一个消息规则如下图,客户端在发送消息的时候按照如下规则进行封包,数据流到服务端后根据对应规则进行反向解包。重点是图中的包头部分,固定为4个字节的空间,服务端接收到数据后可以根据这4个字节得出后续的包体长度,如果后续包体长度不满足,则是消息分包了,继续等待Socket接收即可,如果包体长度多余包头,那是黏包了,只读取包头大小的数据进行解析即可,剩余未读取的数据则是下一个消息。
数据缓冲区
最后就是数据缓冲区了
数据缓冲区是用来存储从Socket中读取、写入的数据的一个byte[]数组。
首先数据缓冲区肯定不是一个固定大小的数组,因为如果传输的数据流过大(玩家发了一段超长的文字),肯定会超出数组的大小,这时候就需要对原有的byte[]进行扩充,创建一个双倍当前数组容量的byte[],然后将原有的byte[]复制到当前新的byte[]。
新的方案:
建立两个Queue,一个用来存储正在使用的byte[],一个用来存储未使用的byte[](充当对象池)。
使用两个指针,前指针用来记录本地读取数据的节点,后指针用于记录Socket写入数据的位置。当读取完一个byte[]数组的数据后,将其保存到对象池中,然后将前指针归零。当Socket写满一个byte[]后从对象池中拿一个新的byte[]继续写入(对象池如果没有则创建一个新的),然后将后指针归零。
这样就可以循环利用byte[],不会出现扩充数组时Copy产生的GC。