Unity Protobuf+RPC+UniTask

远程过程调用(RPC)协议详解

  • 什么是RPC协议
    • RPC的基本原理
    • RPC的关键组件
    • RPC的优缺点
    • Protobuf
    • 函数绑定
    • Call
    • Encode
    • Recv
    • Decode
    • Socket.Send和Recv
    • 项目地址

什么是RPC协议

远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。

RPC的基本原理

RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:

1.客户端调用:客户端程序发起对某个远程过程的调用请求。
2.请求打包:调用参数被打包成消息,发送到服务器。
3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。
4.结果打包和返回:执行结果被打包成消息,发送回客户端。
5.客户端接收结果:客户端解包消息,获取调用结果

RPC的关键组件

1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。
2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。
3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。
4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。

RPC的优缺点

优点
1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。
2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。

缺点
1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。
2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。
3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。

Protobuf

protoc.exe 生成C#文件
在这里插入图片描述

Gen.bat

@echo off

rem 设置路径变量
set PROTOC_PATH="protoc.exe"
set PROTO_DIR="Protos"
set OUTPUT_DIR="ProtocolCodes"

rem 创建日志头
echo .......................proto2C#.......................
echo.

rem 检查目录是否存在
if not exist %PROTO_DIR% (
    echo Error: Protocols directory does not exist.
    echo Please create the Protocols directory and place your .proto files in it.
    echo.
    pause
    exit /b
)

rem 创建输出目录
if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%

rem 批量处理 .proto 文件
for %%f in (%PROTO_DIR%\*.proto) do (
    echo %%f complete
    %PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f
)

echo code generation complete. Press any key to close.
pause > nul

函数绑定

1.使用反射自动获取所有RPC函数, 对其进行Hash绑定

函数的定义 RPCMsgHandles.cs

public sealed class RPCMsgHandles
{
    private static void ReqMove(int unitId, Move move)
    {

    }

    private static void RecvAttack(int skillid, Attack attack, ItemList itemList)
    {
        LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}");
    }

    private static void RecvDelete(int msg)
    {
        LogHelper.Log($"Recv: state = {msg}");
    }

    private static void RecvReflectMove(Move move)
    {
        LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}");
    }
}

使用反射进行函数绑定 RPCMoudle.cs

public sealed class RPCMoudle
{
    private static Dictionary<int, IRPC> _msg = new Dictionary<int, IRPC>();

    public static void Init()
    {
        System.Type type = typeof(RPCMsgHandles);
        MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);
        foreach (MethodInfo methodInfo in methods)
        {
            RPC method = new RPC(methodInfo);
            int index = 0;
            ParameterInfo[] infos = methodInfo.GetParameters();
            foreach (var info in infos)
            {
                if (typeof(IMessage).IsAssignableFrom(info.ParameterType))
                {
                    IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage;
                    method.AddParamType(DateType.Message);
                    method.AddParam(index, message);
                }
                else
                {
                    DateType dateType = GetDateType(info.ParameterType);
                    method.AddParamType(dateType);
                }

                index++;
            }

            int hash = Globals.Hash(methodInfo.Name);
            if (_msg.ContainsKey(hash))
                throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name);

            _msg.Add(hash, method);
        }
    }
}

2.使用泛型手动进行RPC函数绑定

泛型类进行函数绑定 RPCMoudle.cs

public static void Register<T>(string methodName, Action<T> action) where T : class, IMessage, new()
{
    int id = Globals.Hash(methodName);
    RPCStatic<T> method = new RPCStatic<T>();
    method.Register(action, new T());

    if (_msg.ContainsKey(id))
    {
        LogHelper.LogError($"repeat id, id = {id}");
    }

    _msg[id] = method;
}

public static void Unregister(string methodName)
{
    int id = Globals.Hash(methodName);
    if (_msg.ContainsKey(id))
    {
        _msg.Remove(id);
    }
    else
    {
        LogHelper.LogError($"no find method, id = {id}");
    }
}

Call

Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid

Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包

Call函数有多个方法重载, 根据业务需求使用
1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定
2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷

具体调用例子
Move move = new Move();
move.X = 10;
move.Y = 20;
move.Speed = 100;
move.Dir = 20;

这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
使用起来简单方便, 业务逻辑开发上使用较为方便
比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id);
比如请求保存勾选 RPCMoudle.Call("Save", true);
RPCMoudle.Call("ReqMove", 10016, move);

这样是类型安全的, 也不会存在装箱拆箱的开销
更加高效, 战斗场景较为适合
RPCMoudle.Call("ReqMove", move);

Call的实现, 将数据进行Encode转换成二进制

public static void Call(string methodName, IMessage message)
{
    if (message == null) 
        return;

    try
    {

        int id = Globals.Hash(methodName);
        int offset = 0;
        BuffMessage msg = GameFrame.message.GetBuffMessage();
        BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
        offset += sizeof(int);

        BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
        BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);

        msg.length = offset;
        Main.Instance.Send(msg);
    }
    catch(Exception ex)
    {
        LogHelper.LogError(ex.ToString());
    }
}

public static void Call(string id, params object[] args)
{
    try
    {
        Profiler.BeginSample("rpc call");
        int hash = Globals.Hash(id);
        BuffMessage msg = Encode(hash, args);
        Main.Instance.Send(msg);
        Profiler.EndSample();
    }
    catch(Exception ex)
    {
        LogHelper.LogError(ex.ToString());
    }
}

Encode

Encode函数的实现

private static BuffMessage Encode(int id, params object[] args)
{
  int offset = 0;
  BuffMessage msg = GameFrame.message.GetBuffMessage();
  BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
  offset += sizeof(int);
  BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);

  foreach (object arg in args)
  {
      try
      {
          System.Type type = arg.GetType();
          switch (arg)
          {
              case IMessage:
                  BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);
                  break;
              case Int16:
                  BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);
                  break;
              case Int32:
                  BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);
                  break;
              case Int64:
                  BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);
                  break;
              case UInt16:
                  BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);
                  break;
              case UInt32:
                  BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);
                  break;
              case UInt64:
                  BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);
                  break;
              case bool:
                  BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);
                  break;
              case Byte:
                  BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                  break;
              case SByte:
                  BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                  break;
              case Char:
                  BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);
                  break;
              case Single:
                  BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);
                  break;
              case Double:
                  BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);
                  break;
              case string:
                  BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);
                  break;
          }
      }
      catch(Exception ex)
      {
          LogHelper.LogError($"id: {id}, " + ex.ToString());
          msg.Dispose();
          return msg;
      }
  }

  msg.length = offset;
  return msg;
}

0GC的TryWriteBytes方案

namespace Game
{
    public static class BitConverterHelper
    {
        private static readonly int BUFFER_SIZE = 1024 * 1024;
        private static readonly byte[] buffer = new byte[BUFFER_SIZE];
        private static CodedOutputStream _stream;
        private static Stopwatch _watch;

        public static void Init()
        {
            CreateStream();
            _watch = new Stopwatch();
            _watch.Start();
        }

        private static void CreateStream()
        {
            if (_stream != null)
                _stream.Dispose();

            if (_watch != null)
            {
                _watch.Stop();
                LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s");
                _watch.Restart();
            }

            _stream = new CodedOutputStream(buffer);
        }

        private static Span<byte> ToByteArray(IMessage message)
        {
            if (message == null)
                return new byte[0];

            int length = message.CalculateSize();
            if (length == 0)
                return new byte[0];

            if (length >= BUFFER_SIZE)
            {
                throw new Exception($"overflow: message length >= {BUFFER_SIZE}");
            }

            if (_stream.Position + length >= BUFFER_SIZE)
                CreateStream();

            int position = (int)_stream.Position;
            message.WriteTo(_stream);
            return buffer.AsSpan(position, length);
        }

        public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Int16;
            Check(buffer, offset + sizeof(Int16));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Int16);
        }

        public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Int32;
            Check(buffer, offset + sizeof(Int32));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Int32);
        }

        public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Int64;
            Check(buffer, offset + sizeof(Int64));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Int64);
        }

        public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.UInt16;
            Check(buffer, offset + sizeof(UInt16));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(UInt16);
        }

        public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.UInt32;
            Check(buffer, offset + sizeof(UInt32));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(UInt32);
        }

        public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.UInt64;
            Check(buffer, offset + sizeof(UInt64));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(UInt64);
        }

        public static void WriteBool(byte[] buffer, ref int offset, bool arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Boolean;
            Check(buffer, offset + sizeof(bool));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(bool);
        }

        public static void WriteByte(byte[] buffer, ref int offset, byte arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Byte;
            Check(buffer, offset + 1);
            buffer[offset++] = arg;
        }

        public static void WriteChar(byte[] buffer, ref int offset, Char arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Char;
            Check(buffer, offset + sizeof(Char));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Char);
        }

        public static void WriteSingle(byte[] buffer, ref int offset, Single arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Single;
            Check(buffer, offset + sizeof(Single));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Single);
        }

        public static void WriteDouble(byte[] buffer, ref int offset, Double arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Double;
            Check(buffer, offset + sizeof(Double));
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
            offset += sizeof(Double);
        }

        public static void WriteString(byte[] buffer, ref int offset, string arg)
        {
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.String;
            byte[] bytes = Encoding.UTF8.GetBytes(arg);
            Check(buffer, offset + bytes.Length);
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
            offset += sizeof(int);
            Span<byte> target = new Span<byte>(buffer, offset, buffer.Length - offset);
            bytes.CopyTo(target);
            offset += bytes.Length;
        }

        public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg)
        {
            IMessage message = arg;
            Span<byte> bytes = ToByteArray(message);
            Check(buffer, offset + 1);
            buffer[offset++] = (byte)DateType.Message;
            Check(buffer, offset + bytes.Length);
            BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
            offset += sizeof(int);
            Span<byte> target = new Span<byte>(buffer, offset, bytes.Length);
            bytes.CopyTo(target);
            offset += bytes.Length;
        }

        private static void Check(byte[] buffer, int offset)
        {
            if (offset >= buffer.Length)
                throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!");
        }

        public static void Dispose()
        {
            _stream?.Dispose();
            _stream = null;
        }
    }
}

Recv

Decode 数据解析,调用本地方法


public static void OnRPC(BuffMessage msg)
{
    if(msg == null)
    {
        LogHelper.LogError("socket recv error, msg == null");
        return;
    }

    Decode(msg.bytes);
}

Decode

0GC的Decode方案

private static void Decode(byte[] buffer)
{
    if (buffer == null || buffer.Length < sizeof(int))
    {
        LogHelper.LogError("Invalid buffer received");
        return;
    }

    int protoId = BitConverter.ToInt32(buffer, 0);
    if (!_msg.TryGetValue(protoId, out IRPC method))
    {
        LogHelper.LogError($"Method not found for protoId: {protoId}");
        return;
    }

    BuffMessage buffMessage = GameFrame.message.GetBuffMessage();
    try
    {
        Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));
        method.Decode(buffMessage.bytes);
    }
    catch (Exception ex)
    {
        LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}");
    }
    finally
    {
        GameFrame.message.PutBuffMessage(buffMessage);
    }
}
namespace Game
{
    public interface IRPC : IDisposable
    {
        public void Decode(byte[] buffer);
    }

    public abstract class RPCBase : IRPC
    {
        protected byte[] buffer;
        public abstract void Decode(byte[] buffer);

        protected ReadOnlySpan<byte> ReadData(DateType type, ref int offset)
        {
            ReadOnlySpan<byte> data = null;
            int length = GetLength(type);
            if (length > 0)
            {
                data = new ReadOnlySpan<byte>(buffer, offset, length);
                offset += length;
            }

            return data;
        }

        protected bool ToBoolean(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Boolean, ref offset);
            return BitConverter.ToBoolean(data);
        }

        protected Byte ToByte(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
            return data[0];
        }

        protected char ToChar(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
            return BitConverter.ToChar(data);
        }

        protected Int16 ToInt16(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Int16, ref offset);
            return BitConverter.ToInt16(data);
        }

        protected UInt16 ToUInt16(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.UInt16, ref offset);
            return BitConverter.ToUInt16(data);
        }

        protected Int32 ToInt32(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Int32, ref offset);
            return BitConverter.ToInt32(data);
        }

        protected UInt32 ToUInt32(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.UInt32, ref offset);
            return BitConverter.ToUInt32(data);
        }

        protected Int64 ToInt64(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Int64, ref offset);
            return BitConverter.ToInt64(data);
        }

        protected UInt64 ToUInt64(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.UInt64, ref offset);
            return BitConverter.ToUInt64(data);
        }

        protected Single ToSingle(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Single, ref offset);
            return BitConverter.ToSingle(data);
        }

        protected Double ToDouble(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Double, ref offset);
            return BitConverter.ToDouble(data);
        }

        protected string ToString(ref int offset)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.String, ref offset);
            return DecodeString(ref offset);
        }

        protected IMessage ToMessage(ref int offset, IMessage message)
        {
            ReadOnlySpan<byte> data = ReadData(DateType.Message, ref offset);
            return DecodeMessage(ref offset, message);
        }

        private IMessage DecodeMessage(ref int offset, IMessage message)
        {
            int length = BitConverter.ToInt32(buffer, offset);
            offset += sizeof(int);

            ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
            offset += length;

            return message.Descriptor.Parser.ParseFrom(messageData)!;
        }

        private string DecodeString(ref int offset)
        {
            int length = BitConverter.ToInt32(buffer, offset);
            offset += sizeof(int);

            ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
            offset += length;

            return Encoding.UTF8.GetString(messageData);
        }

        private static int GetLength(DateType type)
        {
            switch (type)
            {
                case DateType.Boolean:
                    return sizeof(bool);
                case DateType.Char:
                    return sizeof(char);
                case DateType.SByte:
                case DateType.Byte:
                    return sizeof(byte);
                case DateType.Int16:
                    return sizeof(Int16);
                case DateType.UInt16:
                    return sizeof(UInt16);
                case DateType.Int32:
                    return sizeof(Int32);
                case DateType.UInt32:
                    return sizeof(UInt32);
                case DateType.Int64:
                    return sizeof(Int64);
                case DateType.UInt64:
                    return sizeof(UInt64);
                case DateType.Single:
                    return sizeof(Single);
                case DateType.Double:
                    return sizeof(double);
            }

            return -1;
        }

        public virtual void Dispose()
        {
            buffer = null;
        }
    }
}

Decode数据到对象列表, 然后Invoke

namespace Game
{
    public class RPC : RPCBase
    {
        private MethodInfo _method;
        private List<DateType> _types;
        private List<object> _params;
        private Dictionary<int, IMessage> _param;
        private int _paramIndex;

        public RPC(MethodInfo method)
        {
            this._method = method;
            _types = new List<DateType>();
            _params = new List<object>();
            _param = new Dictionary<int, IMessage>();
        }

        public void AddParamType(DateType type)
        {
            _types?.Add(type!);
        }

        public void AddParam(int index, IMessage message)
        {
            _param[index] = message;
        }

        public override void Decode(byte[] buffer)
        {
            base.buffer = buffer;
            _paramIndex = 0;
            int offset = 0;

            _params.Clear();
            foreach (DateType type in _types)
            {
                DateType dateType = (DateType)buffer[offset++];
                if (dateType != type)
                {
                    LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}");
                }

                object obj = ToObject(dateType, ref offset);
                _params.Add(obj!);
                _paramIndex++;
            }

            _method?.Invoke(null, _params.ToArray());
        }

        private object ToObject(DateType type, ref int offset)
        {
            switch (type)
            {
                case DateType.Message:
                    IMessage message = null;
                    if (!_param!.TryGetValue(_paramIndex, out message))
                    {
                        LogHelper.LogError("no find message");
                        return null;
                    }
                        
                    return ToMessage(ref offset, message);
                case DateType.Boolean:
                    return ToBoolean(ref offset);
                case DateType.Char:
                    return ToChar(ref offset);
                case DateType.SByte:
                case DateType.Byte:
                    return ToByte(ref offset);
                case DateType.Int16:
                    return ToInt16(ref offset);
                case DateType.UInt16:
                    return ToUInt16(ref offset);
                case DateType.Int32:
                    return ToInt32(ref offset);
                case DateType.UInt32:
                    return ToUInt32(ref offset);
                case DateType.Int64:
                    return ToInt64(ref offset);
                case DateType.UInt64:
                    return ToUInt64(ref offset);
                case DateType.Single:
                    return ToSingle(ref offset);
                case DateType.Double:
                    return ToDouble(ref offset);
                case DateType.String:
                    return ToString(ref offset);
                default:
                    LogHelper.LogError("no find dateType: " + type);
                    break;
            }

            return null;
        }

        public override void Dispose()
        {
            base.Dispose();
            _method = null;
            _types = null;
        }
    }
}

泛型Decode,然后Invokde

namespace Game
{
    public class RPCStatic<T> : RPCBase
    {
        private Action<T> _action;
        private IMessage _message;

        public RPCStatic()
        {
        }

        public virtual void Register(Action<T> action, IMessage message)
        {
            this._message = message;
            this._action = action;
        }

        public override void Decode(byte[] buffer)
        {
            base.buffer = buffer;
            int offset = 0;
            DateType dateType = (DateType)buffer[offset++];

            try
            {
                if (dateType == DateType.Message)
                {
                    IMessage arg = ToMessage(ref offset, _message);
                    _action?.Invoke((T)arg);
                }
                else
                {
                    LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}");
                }
            }
            catch (Exception ex)
            {
                LogHelper.LogError(ex.ToString());
            }
        }

        public override void Dispose()
        {
        }
    }
}

Socket.Send和Recv

使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理

namespace Game
{
    public enum SocketState
    {
        None = 0,
        Connected = 1,
        Disconnected = 2,
        Connecting = 3,
        ConnectFailed = 4,
        Close = 5,
        Dispose = 6,
    }

    public class Tcp
    {
        private ConcurrentQueue<BuffMessage> _sendMsgs;
        private ConcurrentQueue<BuffMessage> _receiveMsgs;
        private TcpClient _tcpClient;

        private SocketState _socketState;
        private byte[] _recvBuff;
        private int _recvOffset;
        private int _delay = 10;
        private CancellationTokenSource _recvCancelToken;
        private CancellationTokenSource _sendCancelToken;

        public SocketState State { get { return _socketState; } }
        public string IP { get; set; }
        public int Port { get; set; }

        public NetworkStream Stream
        {
            get { return _tcpClient.GetStream(); }
        }

        public Tcp()
        {
            _sendMsgs = new ConcurrentQueue<BuffMessage>();
            _receiveMsgs = new ConcurrentQueue<BuffMessage>();
            _recvBuff = new byte[Globals.BUFFER_SIZE];
        }

        private void InitTcpClient()
        {
            _tcpClient = new TcpClient();
            _recvCancelToken = new CancellationTokenSource();
            _sendCancelToken = new CancellationTokenSource();
        }

        public void Update()
        {
            Profiler.BeginSample("on tcp rpc");
            if (_receiveMsgs.TryDequeue(out BuffMessage msg))
            {
                RPCMoudle.OnRPC(msg);
                GameFrame.message.PutBuffMessage(msg);
            }
            Profiler.EndSample();
        }

        public void Connect(string ip, int port)
        {
            IP = ip;
            Port = port;
            Connect();
        }

        public async void Connect()
        {
            try
            {
                Close();
                InitTcpClient();
                SetSocketState(SocketState.Connecting);
                await _tcpClient.ConnectAsync(IP, Port);
                OnConnect();
            }
            catch (Exception ex)
            {
                LogHelper.LogError(ex.ToString());
            }
        }

        private void OnConnect()
        {
            try
            {
                if (_tcpClient.Connected)
                {
                    LogHelper.Log("connected...");
                    SetSocketState(SocketState.Connected);
                    StartAsyncTasks();
                }
                else
                {
                    SetSocketState(SocketState.ConnectFailed);
                }
            }
            catch (Exception ex)
            {
                LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message);
                SetSocketState(SocketState.ConnectFailed);
            }
        }

        private void StartAsyncTasks()
        {
            UniTask send = UniTask.Create(SendThread);
            UniTask recv = UniTask.Create(RecvThread);
        }

        private async UniTask SendThread()
        {
            await UniTask.SwitchToThreadPool();
            while (_socketState == SocketState.Connected)
            {
                while (true)
                {
                    if (!_sendMsgs.TryDequeue(out BuffMessage msg))
                        break;

                    var timeoutToken = new CancellationTokenSource();
                    timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));
                    var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);

                    try
                    {
                        if(_sendCancelToken.IsCancellationRequested)
                            break;

                        await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);
                        LogHelper.Log($"发送完成: {msg.length} byte");
                        GameFrame.message.PutBuffMessage(msg);
                    }
                    catch (OperationCanceledException ex)
                    {
                        if (timeoutToken.IsCancellationRequested)
                        {
                            _sendMsgs.Enqueue(msg);
                            LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送...");
                            await UniTask.Delay(10);
                            continue;
                        }

                        LogHelper.LogWarning("发送操作被终止..." + ex.Message);
                        break;
                    }
                    catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted)
                    {
                        LogHelper.Log("发送操作被终止...");
                        break;
                    }
                    catch (Exception ex)
                    {
                        LogHelper.LogError("发送错误: " + ex.Message);
                        break;
                    }
                }

                await UniTask.Delay(_delay);
            }
        }

        private async UniTask RecvThread()
        {
            await UniTask.SwitchToThreadPool();
            while (_socketState == SocketState.Connected)
            {
                try
                {
                    if (_recvCancelToken.IsCancellationRequested) 
                        break;

                    int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);
                    if (length == 0)
                    {
                        LogHelper.Log("connect failed...");
                        break;
                    }

                    _recvOffset += length;
                    int offset = 0;
                    while (true)
                    {
                        if (_recvOffset - offset < sizeof(int))
                            // 没有足够的数据读取下一个消息的长度
                            break;

                        int dataLength = BitConverter.ToInt32(_recvBuff, offset);
                        if (_recvOffset - offset < dataLength + sizeof(int))
                            // 没有足够的数据读取完整的消息
                            break;

                        // 读取完整消息
                        BuffMessage msg = GameFrame.message.GetBuffMessage();
                        Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength);
                        _receiveMsgs.Enqueue(msg);

                        // 移动偏移量到下一个消息
                        offset += sizeof(int) + dataLength;
                    }

                    // 将未处理的数据移到缓冲区开头
                    if (_recvOffset - offset > 0)
                        Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);

                    _recvOffset -= offset;
                }
                catch(OperationCanceledException ex)
                {
                    LogHelper.Log("读取操作被终止: " + ex.Message);
                    break;
                }
                catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted)
                {
                    LogHelper.Log("读取操作被终止...");
                    break;
                }
                catch (Exception ex)
                {
                    LogHelper.LogError("读取错误: " + ex.ToString());
                    break;
                }

                await UniTask.Delay(_delay);
            }
        }

        private void SetSocketState(SocketState state)
        {
            _socketState = state;
        }

        public void Send(BuffMessage message)
        {
            if (message.length > 0)
            {
                int headLength = sizeof(int);
                Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);
                BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);
                message.length += headLength;
                _sendMsgs.Enqueue(message);
            }
            else
            {
                GameFrame.message.PutBuffMessage(message);
            }
        }

        public void Close()
        {
            if (_tcpClient == null)
                return;

            try
            {
                if (_tcpClient.Connected)
                {
                    SetSocketState(SocketState.Close);
                    _recvCancelToken.Dispose();
                    _sendCancelToken.Dispose();
                    _tcpClient.Close();
                }
            }
            catch (Exception ex)
            {
                LogHelper.LogError(ex.ToString());
            }
        }

        public void Dispose()
        {
            Close();
        
            if (_tcpClient != null)
            {
                _tcpClient.Dispose();
                _tcpClient = null;
            }

            if (_sendMsgs != null)
            {
                _sendMsgs.Clear();
                _sendMsgs = null;
            }

            if (_receiveMsgs != null)
            {
                _receiveMsgs.Clear();
                _receiveMsgs = null;
            }

            SetSocketState(SocketState.Dispose);
        }
    }
}

项目地址

SimpleRPC

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/707354.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言| 把数组a赋给数组b

把数组a赋给数组b, 正确的写法是用for循环&#xff0c;将数组a中的元素一个一个赋给数组b的元素。 #include <stdio.h> int main(void) { int a[5] {11, 22, 33, 44, 55}; int b[5]; int i; for(i0; i<5; i) { b[i] a[i]; printf(…

手机连接ESP8266的WIFI,进入内置网页,输入要显示的内容,在OLED显示屏上显示文本

在这篇技术博客中&#xff0c;我们将探讨如何使用ESP8266 Wi-Fi 模块和SSD1306 OLED显示屏&#xff0c;构建一个简易的信息显示和交互系统。此系统能够让用户通过一个简单的Web界面输入信息&#xff0c;并将其显示在OLED屏幕上。这种设备的应用非常广泛&#xff0c;可以用于智能…

多应用对接企业微信授权和扫码登录

多应用对接企业微信授权和扫码登录是一种常见的企业级解决方案&#xff0c;它可以帮助企业实现统一的身份验证和管理&#xff0c;提升用户体验和安全性。本文将介绍如何实现多应用对接企业微信授权和扫码登录的方法和步骤。 # 第一步&#xff1a;注册企业微信开放平台应用 首…

Java——IO流(一)-(4/8):前置知识-字符集、UTF-8、GBK、ASCII、乱码问题、编码和解码等

目录 常见字符集介绍 标准ASCII字符集 GBK&#xff08;汉字内码扩展规范&#xff0c;国标&#xff09; Unicode字符集&#xff08;统一码&#xff0c;万国码&#xff09; 小结 字符集的编码、解码操作 方法 实例演示 常见字符集介绍 标准ASCII字符集 ASCll(American St…

u-boot(四)-顶层目录链接脚本文件(u-boot.lds)介绍

一&#xff0c;IMX6ULL映像文件 1&#xff0c;格式概述 对于IMX6ULL&#xff0c;烧写在EMMC、SD/TF卡上的程序&#xff0c;并不能“自己复制自己”&#xff0c;是“别人把它复制到内存里”。一上电首先运行的是boot ROM上的程序&#xff0c;它从EMMC、SD/TF卡上把程序复制进内…

京准电钟 NTP时间同步服务器助力水库水坝水利自动化建设

京准电钟 NTP时间同步服务器助力水库水坝水利自动化建设 京准电钟 NTP时间同步服务器助力水库水坝水利自动化建设 水库大坝监测系统主要包括渗流监测系统、流量监测系统、雨量监测系统、沉降监测系统组成。每一个监测系统由监测仪器及自动化数据采集装置&#xff08;内置通信装…

AI引领项目管理新时代:效率与智能并驾齐驱

在数字化浪潮的推动下&#xff0c;项目管理领域正迎来一场由AI技术引领的革新。从自动化任务执行到智能决策支持&#xff0c;AI技术的应用正让项目管理变得更加高效、精准和智能化。本文将探讨项目管理人员及其实施团队如何运用AI技术&#xff0c;以及这些技术如何助力项目管理…

如何用Xinstall CPS结算系统打破传统营销桎梏,实现用户增长?

在互联网流量红利逐渐衰退的今天&#xff0c;App推广和运营面临着前所未有的挑战。如何快速搭建起满足用户需求的运营体系&#xff0c;成为了众多企业急待解决的问题。而在这个关键时刻&#xff0c;Xinstall CPS结算系统应运而生&#xff0c;以其独特的优势帮助企业解决了一系列…

iptables教程

1.iptables安装 1.1 iptables和iptables-service的关系 iptables 是基于内核的&#xff0c;和 iptables-services 没有关系&#xff0c;不用安装任何工具包就可以使用 iptable 命令添加的防火墙规则&#xff0c; 但是iptables添加的规则是临时的&#xff0c;基于内存的&…

SK海力士计划于2024年第四季度启动GDDR7大规模生产

SK海力士&#xff0c;作为HBM市场的领头羊&#xff0c;于6月13日宣布&#xff0c;公司目标于2024年第四季度开始其GDDR7芯片的大规模生产。 与此同时&#xff0c;美光科技在Computex展会上也宣布推出其GDDR7图形内存&#xff0c;目前正处于样品测试阶段。据AnandTech报道&#…

CRC计算单元

CRC计算单元 CRC是Cyclic Redundancy Check,循环冗余校验的缩写. 是一种检测数据错误的技术,主要用在数据通信和数据存储的方面. 但是这种技术只能检测到传输或存储的数据是否有误,没有将错误纠正的功能. 而CRC计算单元是一个独立的具备CRC计算功能的外设. AT32 MCU片上CRC计…

当JS遇上NLP:开启图片分析的奇幻之旅

前言 在当今科技飞速发展的时代&#xff0c;JavaScript&#xff08;JS&#xff09;作为广泛应用的编程语言&#xff0c;展现出了强大的活力与无限的可能性。与此同时&#xff0c;自然语言处理&#xff08;NLP&#xff09;领域也正在经历着深刻的变革与进步。 当这两者碰撞在一…

phpcms仿蚁乐购淘宝客网站模板

phpcms仿蚁乐购网站模板&#xff0c;淘宝客行业模板免费下载&#xff0c;该模板网站很容易吸引访客点击&#xff0c;提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计&#xff0c;且栏目列表以简洁&#xff0c;非常时尚大气。页面根据分辨率大小而自…

《现代通信原理与技术》--数字信号的最佳接收实验报告

《现代通信原理与技术》 数字信号的最佳接收实验报告 实 验&#xff1a;数字信号的最佳接收实验报告 目录 摘要......................................................................................................3 引言........................................…

Linux---系统的初步学习【 项目二 管理Linux文件和目录】

项目二 管理Linux文件和目录 2.1项目知识准备 ​ 文件是存储在计算机上的数据集合。在Windows系统中&#xff0c;我们理解的文件可以是文本文档、图片、程序、音乐、视频等。在Linux中&#xff0c;一切皆文件&#xff0c;也就是除了Windows中所理解的文件&#xff0c;目录、字…

如何申请小程序SSL证书

在互联网时代&#xff0c;数据安全和用户隐私保护变得尤为重要。SSL证书作为网站、应用或小程序与用户之间建立安全连接的关键工具&#xff0c;其重要性不言而喻。SSL证书能够加密数据传输&#xff0c;防止信息被窃取&#xff0c;提升用户信任度&#xff0c;对于小程序开发者来…

u-boot(六) - 详细启动流程

一&#xff0c;u-boot启动第一阶段 1&#xff0c;启动流程 ENTRY(_start) //arch/arm/lib/vectors.S ----b resets //arch/arm/cpu/armv7/start.S --------b save_boot_params ------------b save_boot_params_ret //将cpu的工作模式设置为SVC32模式&#xff08;即管理模式&a…

【PHP开发工程师系统性教学】——thinkPHP的分页操作,不管有几页,用这个你想分几页就分几页

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

力扣384. 打乱数组

Problem: 384. 打乱数组 文章目录 题目描述思路复杂度Code 题目描述 思路 打乱数组的主要算法&#xff1a; 从1 - n每次生成[i ~ n - i]的一个随机数字&#xff0c;再将原数组下标位置为i的元素和该随机数字位置的元素交换 复杂度 打乱数组的主要算法 时间复杂度: O ( n ) O(…

YOLOv8可视化界面,web网页端检测

YOLOv8可视化界面&#xff0c;web网页端检测。支持图片检测&#xff0c;视频检测&#xff0c;摄像头检测等&#xff0c;支持检测、分割等多种任务&#xff0c;实时显示检测画面。支持自定义数据集&#xff0c;计数&#xff0c;……
最新文章