远程过程调用(RPC)协议详解
- 什么是RPC协议
- RPC的基本原理
- RPC的关键组件
- RPC的优缺点
- Protobuf
- 函数绑定
- Call
- Encode
- Recv
- Decode
- Socket.Send和Recv
- 项目地址
什么是RPC协议
远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。
RPC的基本原理
RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:
1.客户端调用:客户端程序发起对某个远程过程的调用请求。
2.请求打包:调用参数被打包成消息,发送到服务器。
3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。
4.结果打包和返回:执行结果被打包成消息,发送回客户端。
5.客户端接收结果:客户端解包消息,获取调用结果
RPC的关键组件
1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。
2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。
3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。
4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。
RPC的优缺点
优点
1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。
2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。
缺点
1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。
2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。
3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。
Protobuf
protoc.exe 生成C#文件
Gen.bat
@echo off
rem 设置路径变量
set PROTOC_PATH="protoc.exe"
set PROTO_DIR="Protos"
set OUTPUT_DIR="ProtocolCodes"
rem 创建日志头
echo .......................proto2C#.......................
echo.
rem 检查目录是否存在
if not exist %PROTO_DIR% (
echo Error: Protocols directory does not exist.
echo Please create the Protocols directory and place your .proto files in it.
echo.
pause
exit /b
)
rem 创建输出目录
if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%
rem 批量处理 .proto 文件
for %%f in (%PROTO_DIR%\*.proto) do (
echo %%f complete
%PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f
)
echo code generation complete. Press any key to close.
pause > nul
函数绑定
1.使用反射自动获取所有RPC函数, 对其进行Hash绑定
函数的定义 RPCMsgHandles.cs
public sealed class RPCMsgHandles
{
private static void ReqMove(int unitId, Move move)
{
}
private static void RecvAttack(int skillid, Attack attack, ItemList itemList)
{
LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}");
}
private static void RecvDelete(int msg)
{
LogHelper.Log($"Recv: state = {msg}");
}
private static void RecvReflectMove(Move move)
{
LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}");
}
}
使用反射进行函数绑定 RPCMoudle.cs
public sealed class RPCMoudle
{
private static Dictionary<int, IRPC> _msg = new Dictionary<int, IRPC>();
public static void Init()
{
System.Type type = typeof(RPCMsgHandles);
MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);
foreach (MethodInfo methodInfo in methods)
{
RPC method = new RPC(methodInfo);
int index = 0;
ParameterInfo[] infos = methodInfo.GetParameters();
foreach (var info in infos)
{
if (typeof(IMessage).IsAssignableFrom(info.ParameterType))
{
IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage;
method.AddParamType(DateType.Message);
method.AddParam(index, message);
}
else
{
DateType dateType = GetDateType(info.ParameterType);
method.AddParamType(dateType);
}
index++;
}
int hash = Globals.Hash(methodInfo.Name);
if (_msg.ContainsKey(hash))
throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name);
_msg.Add(hash, method);
}
}
}
2.使用泛型手动进行RPC函数绑定
泛型类进行函数绑定 RPCMoudle.cs
public static void Register<T>(string methodName, Action<T> action) where T : class, IMessage, new()
{
int id = Globals.Hash(methodName);
RPCStatic<T> method = new RPCStatic<T>();
method.Register(action, new T());
if (_msg.ContainsKey(id))
{
LogHelper.LogError($"repeat id, id = {id}");
}
_msg[id] = method;
}
public static void Unregister(string methodName)
{
int id = Globals.Hash(methodName);
if (_msg.ContainsKey(id))
{
_msg.Remove(id);
}
else
{
LogHelper.LogError($"no find method, id = {id}");
}
}
Call
Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid
Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包
Call函数有多个方法重载, 根据业务需求使用
1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定
2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷
具体调用例子
Move move = new Move();
move.X = 10;
move.Y = 20;
move.Speed = 100;
move.Dir = 20;
这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
使用起来简单方便, 业务逻辑开发上使用较为方便
比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id);
比如请求保存勾选 RPCMoudle.Call("Save", true);
RPCMoudle.Call("ReqMove", 10016, move);
这样是类型安全的, 也不会存在装箱拆箱的开销
更加高效, 战斗场景较为适合
RPCMoudle.Call("ReqMove", move);
Call的实现, 将数据进行Encode转换成二进制
public static void Call(string methodName, IMessage message)
{
if (message == null)
return;
try
{
int id = Globals.Hash(methodName);
int offset = 0;
BuffMessage msg = GameFrame.message.GetBuffMessage();
BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
offset += sizeof(int);
BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);
msg.length = offset;
Main.Instance.Send(msg);
}
catch(Exception ex)
{
LogHelper.LogError(ex.ToString());
}
}
public static void Call(string id, params object[] args)
{
try
{
Profiler.BeginSample("rpc call");
int hash = Globals.Hash(id);
BuffMessage msg = Encode(hash, args);
Main.Instance.Send(msg);
Profiler.EndSample();
}
catch(Exception ex)
{
LogHelper.LogError(ex.ToString());
}
}
Encode
Encode函数的实现
private static BuffMessage Encode(int id, params object[] args)
{
int offset = 0;
BuffMessage msg = GameFrame.message.GetBuffMessage();
BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
offset += sizeof(int);
BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
foreach (object arg in args)
{
try
{
System.Type type = arg.GetType();
switch (arg)
{
case IMessage:
BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);
break;
case Int16:
BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);
break;
case Int32:
BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);
break;
case Int64:
BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);
break;
case UInt16:
BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);
break;
case UInt32:
BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);
break;
case UInt64:
BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);
break;
case bool:
BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);
break;
case Byte:
BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
break;
case SByte:
BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
break;
case Char:
BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);
break;
case Single:
BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);
break;
case Double:
BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);
break;
case string:
BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);
break;
}
}
catch(Exception ex)
{
LogHelper.LogError($"id: {id}, " + ex.ToString());
msg.Dispose();
return msg;
}
}
msg.length = offset;
return msg;
}
0GC的TryWriteBytes方案
namespace Game
{
public static class BitConverterHelper
{
private static readonly int BUFFER_SIZE = 1024 * 1024;
private static readonly byte[] buffer = new byte[BUFFER_SIZE];
private static CodedOutputStream _stream;
private static Stopwatch _watch;
public static void Init()
{
CreateStream();
_watch = new Stopwatch();
_watch.Start();
}
private static void CreateStream()
{
if (_stream != null)
_stream.Dispose();
if (_watch != null)
{
_watch.Stop();
LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s");
_watch.Restart();
}
_stream = new CodedOutputStream(buffer);
}
private static Span<byte> ToByteArray(IMessage message)
{
if (message == null)
return new byte[0];
int length = message.CalculateSize();
if (length == 0)
return new byte[0];
if (length >= BUFFER_SIZE)
{
throw new Exception($"overflow: message length >= {BUFFER_SIZE}");
}
if (_stream.Position + length >= BUFFER_SIZE)
CreateStream();
int position = (int)_stream.Position;
message.WriteTo(_stream);
return buffer.AsSpan(position, length);
}
public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Int16;
Check(buffer, offset + sizeof(Int16));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Int16);
}
public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Int32;
Check(buffer, offset + sizeof(Int32));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Int32);
}
public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Int64;
Check(buffer, offset + sizeof(Int64));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Int64);
}
public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.UInt16;
Check(buffer, offset + sizeof(UInt16));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(UInt16);
}
public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.UInt32;
Check(buffer, offset + sizeof(UInt32));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(UInt32);
}
public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.UInt64;
Check(buffer, offset + sizeof(UInt64));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(UInt64);
}
public static void WriteBool(byte[] buffer, ref int offset, bool arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Boolean;
Check(buffer, offset + sizeof(bool));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(bool);
}
public static void WriteByte(byte[] buffer, ref int offset, byte arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Byte;
Check(buffer, offset + 1);
buffer[offset++] = arg;
}
public static void WriteChar(byte[] buffer, ref int offset, Char arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Char;
Check(buffer, offset + sizeof(Char));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Char);
}
public static void WriteSingle(byte[] buffer, ref int offset, Single arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Single;
Check(buffer, offset + sizeof(Single));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Single);
}
public static void WriteDouble(byte[] buffer, ref int offset, Double arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Double;
Check(buffer, offset + sizeof(Double));
BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
offset += sizeof(Double);
}
public static void WriteString(byte[] buffer, ref int offset, string arg)
{
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.String;
byte[] bytes = Encoding.UTF8.GetBytes(arg);
Check(buffer, offset + bytes.Length);
BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
offset += sizeof(int);
Span<byte> target = new Span<byte>(buffer, offset, buffer.Length - offset);
bytes.CopyTo(target);
offset += bytes.Length;
}
public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg)
{
IMessage message = arg;
Span<byte> bytes = ToByteArray(message);
Check(buffer, offset + 1);
buffer[offset++] = (byte)DateType.Message;
Check(buffer, offset + bytes.Length);
BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
offset += sizeof(int);
Span<byte> target = new Span<byte>(buffer, offset, bytes.Length);
bytes.CopyTo(target);
offset += bytes.Length;
}
private static void Check(byte[] buffer, int offset)
{
if (offset >= buffer.Length)
throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!");
}
public static void Dispose()
{
_stream?.Dispose();
_stream = null;
}
}
}
Recv
Decode 数据解析,调用本地方法
public static void OnRPC(BuffMessage msg)
{
if(msg == null)
{
LogHelper.LogError("socket recv error, msg == null");
return;
}
Decode(msg.bytes);
}
Decode
0GC的Decode方案
private static void Decode(byte[] buffer)
{
if (buffer == null || buffer.Length < sizeof(int))
{
LogHelper.LogError("Invalid buffer received");
return;
}
int protoId = BitConverter.ToInt32(buffer, 0);
if (!_msg.TryGetValue(protoId, out IRPC method))
{
LogHelper.LogError($"Method not found for protoId: {protoId}");
return;
}
BuffMessage buffMessage = GameFrame.message.GetBuffMessage();
try
{
Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));
method.Decode(buffMessage.bytes);
}
catch (Exception ex)
{
LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}");
}
finally
{
GameFrame.message.PutBuffMessage(buffMessage);
}
}
namespace Game
{
public interface IRPC : IDisposable
{
public void Decode(byte[] buffer);
}
public abstract class RPCBase : IRPC
{
protected byte[] buffer;
public abstract void Decode(byte[] buffer);
protected ReadOnlySpan<byte> ReadData(DateType type, ref int offset)
{
ReadOnlySpan<byte> data = null;
int length = GetLength(type);
if (length > 0)
{
data = new ReadOnlySpan<byte>(buffer, offset, length);
offset += length;
}
return data;
}
protected bool ToBoolean(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Boolean, ref offset);
return BitConverter.ToBoolean(data);
}
protected Byte ToByte(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
return data[0];
}
protected char ToChar(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
return BitConverter.ToChar(data);
}
protected Int16 ToInt16(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Int16, ref offset);
return BitConverter.ToInt16(data);
}
protected UInt16 ToUInt16(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.UInt16, ref offset);
return BitConverter.ToUInt16(data);
}
protected Int32 ToInt32(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Int32, ref offset);
return BitConverter.ToInt32(data);
}
protected UInt32 ToUInt32(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.UInt32, ref offset);
return BitConverter.ToUInt32(data);
}
protected Int64 ToInt64(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Int64, ref offset);
return BitConverter.ToInt64(data);
}
protected UInt64 ToUInt64(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.UInt64, ref offset);
return BitConverter.ToUInt64(data);
}
protected Single ToSingle(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Single, ref offset);
return BitConverter.ToSingle(data);
}
protected Double ToDouble(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.Double, ref offset);
return BitConverter.ToDouble(data);
}
protected string ToString(ref int offset)
{
ReadOnlySpan<byte> data = ReadData(DateType.String, ref offset);
return DecodeString(ref offset);
}
protected IMessage ToMessage(ref int offset, IMessage message)
{
ReadOnlySpan<byte> data = ReadData(DateType.Message, ref offset);
return DecodeMessage(ref offset, message);
}
private IMessage DecodeMessage(ref int offset, IMessage message)
{
int length = BitConverter.ToInt32(buffer, offset);
offset += sizeof(int);
ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
offset += length;
return message.Descriptor.Parser.ParseFrom(messageData)!;
}
private string DecodeString(ref int offset)
{
int length = BitConverter.ToInt32(buffer, offset);
offset += sizeof(int);
ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
offset += length;
return Encoding.UTF8.GetString(messageData);
}
private static int GetLength(DateType type)
{
switch (type)
{
case DateType.Boolean:
return sizeof(bool);
case DateType.Char:
return sizeof(char);
case DateType.SByte:
case DateType.Byte:
return sizeof(byte);
case DateType.Int16:
return sizeof(Int16);
case DateType.UInt16:
return sizeof(UInt16);
case DateType.Int32:
return sizeof(Int32);
case DateType.UInt32:
return sizeof(UInt32);
case DateType.Int64:
return sizeof(Int64);
case DateType.UInt64:
return sizeof(UInt64);
case DateType.Single:
return sizeof(Single);
case DateType.Double:
return sizeof(double);
}
return -1;
}
public virtual void Dispose()
{
buffer = null;
}
}
}
Decode数据到对象列表, 然后Invoke
namespace Game
{
public class RPC : RPCBase
{
private MethodInfo _method;
private List<DateType> _types;
private List<object> _params;
private Dictionary<int, IMessage> _param;
private int _paramIndex;
public RPC(MethodInfo method)
{
this._method = method;
_types = new List<DateType>();
_params = new List<object>();
_param = new Dictionary<int, IMessage>();
}
public void AddParamType(DateType type)
{
_types?.Add(type!);
}
public void AddParam(int index, IMessage message)
{
_param[index] = message;
}
public override void Decode(byte[] buffer)
{
base.buffer = buffer;
_paramIndex = 0;
int offset = 0;
_params.Clear();
foreach (DateType type in _types)
{
DateType dateType = (DateType)buffer[offset++];
if (dateType != type)
{
LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}");
}
object obj = ToObject(dateType, ref offset);
_params.Add(obj!);
_paramIndex++;
}
_method?.Invoke(null, _params.ToArray());
}
private object ToObject(DateType type, ref int offset)
{
switch (type)
{
case DateType.Message:
IMessage message = null;
if (!_param!.TryGetValue(_paramIndex, out message))
{
LogHelper.LogError("no find message");
return null;
}
return ToMessage(ref offset, message);
case DateType.Boolean:
return ToBoolean(ref offset);
case DateType.Char:
return ToChar(ref offset);
case DateType.SByte:
case DateType.Byte:
return ToByte(ref offset);
case DateType.Int16:
return ToInt16(ref offset);
case DateType.UInt16:
return ToUInt16(ref offset);
case DateType.Int32:
return ToInt32(ref offset);
case DateType.UInt32:
return ToUInt32(ref offset);
case DateType.Int64:
return ToInt64(ref offset);
case DateType.UInt64:
return ToUInt64(ref offset);
case DateType.Single:
return ToSingle(ref offset);
case DateType.Double:
return ToDouble(ref offset);
case DateType.String:
return ToString(ref offset);
default:
LogHelper.LogError("no find dateType: " + type);
break;
}
return null;
}
public override void Dispose()
{
base.Dispose();
_method = null;
_types = null;
}
}
}
泛型Decode,然后Invokde
namespace Game
{
public class RPCStatic<T> : RPCBase
{
private Action<T> _action;
private IMessage _message;
public RPCStatic()
{
}
public virtual void Register(Action<T> action, IMessage message)
{
this._message = message;
this._action = action;
}
public override void Decode(byte[] buffer)
{
base.buffer = buffer;
int offset = 0;
DateType dateType = (DateType)buffer[offset++];
try
{
if (dateType == DateType.Message)
{
IMessage arg = ToMessage(ref offset, _message);
_action?.Invoke((T)arg);
}
else
{
LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}");
}
}
catch (Exception ex)
{
LogHelper.LogError(ex.ToString());
}
}
public override void Dispose()
{
}
}
}
Socket.Send和Recv
使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理
namespace Game
{
public enum SocketState
{
None = 0,
Connected = 1,
Disconnected = 2,
Connecting = 3,
ConnectFailed = 4,
Close = 5,
Dispose = 6,
}
public class Tcp
{
private ConcurrentQueue<BuffMessage> _sendMsgs;
private ConcurrentQueue<BuffMessage> _receiveMsgs;
private TcpClient _tcpClient;
private SocketState _socketState;
private byte[] _recvBuff;
private int _recvOffset;
private int _delay = 10;
private CancellationTokenSource _recvCancelToken;
private CancellationTokenSource _sendCancelToken;
public SocketState State { get { return _socketState; } }
public string IP { get; set; }
public int Port { get; set; }
public NetworkStream Stream
{
get { return _tcpClient.GetStream(); }
}
public Tcp()
{
_sendMsgs = new ConcurrentQueue<BuffMessage>();
_receiveMsgs = new ConcurrentQueue<BuffMessage>();
_recvBuff = new byte[Globals.BUFFER_SIZE];
}
private void InitTcpClient()
{
_tcpClient = new TcpClient();
_recvCancelToken = new CancellationTokenSource();
_sendCancelToken = new CancellationTokenSource();
}
public void Update()
{
Profiler.BeginSample("on tcp rpc");
if (_receiveMsgs.TryDequeue(out BuffMessage msg))
{
RPCMoudle.OnRPC(msg);
GameFrame.message.PutBuffMessage(msg);
}
Profiler.EndSample();
}
public void Connect(string ip, int port)
{
IP = ip;
Port = port;
Connect();
}
public async void Connect()
{
try
{
Close();
InitTcpClient();
SetSocketState(SocketState.Connecting);
await _tcpClient.ConnectAsync(IP, Port);
OnConnect();
}
catch (Exception ex)
{
LogHelper.LogError(ex.ToString());
}
}
private void OnConnect()
{
try
{
if (_tcpClient.Connected)
{
LogHelper.Log("connected...");
SetSocketState(SocketState.Connected);
StartAsyncTasks();
}
else
{
SetSocketState(SocketState.ConnectFailed);
}
}
catch (Exception ex)
{
LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message);
SetSocketState(SocketState.ConnectFailed);
}
}
private void StartAsyncTasks()
{
UniTask send = UniTask.Create(SendThread);
UniTask recv = UniTask.Create(RecvThread);
}
private async UniTask SendThread()
{
await UniTask.SwitchToThreadPool();
while (_socketState == SocketState.Connected)
{
while (true)
{
if (!_sendMsgs.TryDequeue(out BuffMessage msg))
break;
var timeoutToken = new CancellationTokenSource();
timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);
try
{
if(_sendCancelToken.IsCancellationRequested)
break;
await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);
LogHelper.Log($"发送完成: {msg.length} byte");
GameFrame.message.PutBuffMessage(msg);
}
catch (OperationCanceledException ex)
{
if (timeoutToken.IsCancellationRequested)
{
_sendMsgs.Enqueue(msg);
LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送...");
await UniTask.Delay(10);
continue;
}
LogHelper.LogWarning("发送操作被终止..." + ex.Message);
break;
}
catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted)
{
LogHelper.Log("发送操作被终止...");
break;
}
catch (Exception ex)
{
LogHelper.LogError("发送错误: " + ex.Message);
break;
}
}
await UniTask.Delay(_delay);
}
}
private async UniTask RecvThread()
{
await UniTask.SwitchToThreadPool();
while (_socketState == SocketState.Connected)
{
try
{
if (_recvCancelToken.IsCancellationRequested)
break;
int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);
if (length == 0)
{
LogHelper.Log("connect failed...");
break;
}
_recvOffset += length;
int offset = 0;
while (true)
{
if (_recvOffset - offset < sizeof(int))
// 没有足够的数据读取下一个消息的长度
break;
int dataLength = BitConverter.ToInt32(_recvBuff, offset);
if (_recvOffset - offset < dataLength + sizeof(int))
// 没有足够的数据读取完整的消息
break;
// 读取完整消息
BuffMessage msg = GameFrame.message.GetBuffMessage();
Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength);
_receiveMsgs.Enqueue(msg);
// 移动偏移量到下一个消息
offset += sizeof(int) + dataLength;
}
// 将未处理的数据移到缓冲区开头
if (_recvOffset - offset > 0)
Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);
_recvOffset -= offset;
}
catch(OperationCanceledException ex)
{
LogHelper.Log("读取操作被终止: " + ex.Message);
break;
}
catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted)
{
LogHelper.Log("读取操作被终止...");
break;
}
catch (Exception ex)
{
LogHelper.LogError("读取错误: " + ex.ToString());
break;
}
await UniTask.Delay(_delay);
}
}
private void SetSocketState(SocketState state)
{
_socketState = state;
}
public void Send(BuffMessage message)
{
if (message.length > 0)
{
int headLength = sizeof(int);
Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);
BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);
message.length += headLength;
_sendMsgs.Enqueue(message);
}
else
{
GameFrame.message.PutBuffMessage(message);
}
}
public void Close()
{
if (_tcpClient == null)
return;
try
{
if (_tcpClient.Connected)
{
SetSocketState(SocketState.Close);
_recvCancelToken.Dispose();
_sendCancelToken.Dispose();
_tcpClient.Close();
}
}
catch (Exception ex)
{
LogHelper.LogError(ex.ToString());
}
}
public void Dispose()
{
Close();
if (_tcpClient != null)
{
_tcpClient.Dispose();
_tcpClient = null;
}
if (_sendMsgs != null)
{
_sendMsgs.Clear();
_sendMsgs = null;
}
if (_receiveMsgs != null)
{
_receiveMsgs.Clear();
_receiveMsgs = null;
}
SetSocketState(SocketState.Dispose);
}
}
}
项目地址
SimpleRPC