Golang 原生Rpc Server实现

Golang 原生Rpc Server实现

  • 引言
  • 源码解析
    • 服务端
      • 数据结构
      • 服务注册
      • 请求处理
    • 客户端
      • 数据结构
      • 建立连接
      • 请求调用
  • 延伸
    • 异步调用
    • 定制服务名
    • 采用TPC协议建立连接
    • 自定义编码格式
    • 自定义服务器
  • 参考


引言

本文我们来看看golang原生rpc库的实现 , 首先来看一下golang rpc库的demo案例:

  • 服务端和客户端公共代码
type HelloService interface {
	Hello(request *Request, response *Response) error
}

type Request struct {
	Header map[string]interface{}
	Params map[string]interface{}
}

type Response struct {
	Header map[string]interface{}
	Params map[string]interface{}
}
  • 服务端代码
type HelloServiceImpl int

func NewServer() {
	helloImpl := new(HelloServiceImpl)
	rpc.RegisterName("helloService", helloImpl)
	rpc.HandleHTTP()
	if err := http.ListenAndServe(":1235", nil); err != nil {
		log.Fatal("server error: ", err)
	}
}

func (s *HelloServiceImpl) Hello(request *common.Request, response *common.Response) error {
	response.Header = request.Header
	response.Params = map[string]interface{}{
		"data": "Hello World",
	}
	return nil
}
  • 客户端代码
func NewClient() *common.Response {
	client, err := rpc.DialHTTP("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing: ", err)
	}

	res := &common.Response{}

	err = client.Call("helloService.Hello", &common.Request{
		map[string]interface{}{
			"client": "val1",
		}, map[string]interface{}{
			"data": "hello world",
		},
	}, res)

	if err != nil {
		log.Fatal("call: ", err)
	}
	return res
}

golang 原生 rpc 库的使用基本还是分为两步走:

server 端 :

  1. 服务注册
  2. 启动服务

server端对注册的方法有一定的限制,方法必须满足签名:

func (t *T) MethodName(argType T1, replyType *T2) error
  • 首先,方法必须是导出的(名字首字母大写);
  • 其次,方法接受两个参数,必须是导出的或内置类型。第一个参数表示客户端传递过来的请求参数,第二个是需要返回给客户端的响应。第二个参数必须为指针类型(需要修改);
  • 最后,方法必须返回一个error类型的值。返回非nil的值,表示调用出错。

rpc.HandleHTTP()注册 HTTP 路由。http.ListenAndServe(“:1234”, nil)在端口1234上启动一个 HTTP 服务,请求 rpc 方法会交给rpc内部路由处理。这样我们就可以通过客户端调用这两个方法了。


client 端 :

  1. 连接服务端
  2. 调用接口

客户端比服务端稍微简单一点,我们使用rpc.DialHTTP(“tcp”, “:1234”)连接到服务端的监听地址,返回一个 rpc 的客户端对象。后续就可以调用该对象的Call()方法调用服务端对象的对应方法,依次传入方法名(需要加上类型限定)、参数、一个指针(用于接收返回值)


源码解析

对net/http包不熟悉的童鞋可能会觉得奇怪,rpc.HandleHTTP()与http.ListenAndServer(“:1234”, nil)是怎么联系起来的?我们简单看一下源码:

// src/net/rpc/server.go
const (
  // Defaults used by HandleHTTP
  DefaultRPCPath   = "/_goRPC_"
  DefaultDebugPath = "/debug/rpc"
)

func (server *Server) HandleHTTP(rpcPath, debugPath string) {
  http.Handle(rpcPath, server)
  http.Handle(debugPath, debugHTTP{server})
}

func HandleHTTP() {
  DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}

实际上,rpc.HandleHTTP()会调用http.Handle()在预定义的路径上(/_goRPC_)注册处理器。这个处理器最终被添加到net/http包中的默认多路复用器上:

// src/net/http/server.go
func Handle(pattern string, handler Handler) {
  DefaultServeMux.Handle(pattern, handler)
}

而http.ListenAndServer()第二个参数传入nil时也是使用默认的多路复用器。

有关golang http server 实现,可阅读:

  • Golang 原生Http Server实现

细心的朋友可能发现了,除了默认的路径/_goRPC_用来处理 RPC 请求,rpc.HandleHTTP()方法还注册了一个调试路径/debug/rpc。我们可以直接在浏览器中访问这个网址(需要服务端程序开启。如果服务端在远程,需要相应地修改地址)localhost:1234,直观的查看各个方法的调用情况:

在这里插入图片描述


当我们访问/_goRPC_路径 , 最终调用到的请求处理器是net/rpc/server包下的ServerHttp函数:

func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "CONNECT" {
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		w.WriteHeader(http.StatusMethodNotAllowed)
		io.WriteString(w, "405 must CONNECT\n")
		return
	}
	// 拦截http连接拦截,获取原生的connection
	conn, _, err := w.(http.Hijacker).Hijack()
	...
	io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
	// 连接上后续的数据读写都走rpc 协议 , 不走 http 协议了
	server.ServeConn(conn)
}

服务端

数据结构

首先来看一下承载Rpc服务核心状态的Server结构体实现:

// Server represents an RPC Server.
type Server struct {
	serviceMap sync.Map   // map[string]*service. 服务信息映射集合
	reqLock    sync.Mutex // protects freeReq.  
	freeReq    *Request 
	respLock   sync.Mutex // protects freeResp
	freeResp   *Response
}

其次是包含了注册服务信息的service结构体实现:

type service struct {
	name   string                 // 服务名
	rcvr   reflect.Value          // 服务实现类
	typ    reflect.Type           // 服务实现类类型
	method map[string]*methodType // 当前服务接口注册进来的方法列表
}

type methodType struct {
	sync.Mutex 
	method     reflect.Method
	ArgType    reflect.Type 
	ReplyType  reflect.Type
	numCalls   uint
}

下面是golang rpc通信使用到的请求和响应对象结构 , 请求和响应对象都会采用对象池进行复用,所以都有next属性:

// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {
	ServiceMethod string    // echoes that of the Request
	Seq           uint64    // echoes that of the request
	Error         string    // error, if any.
	next          *Response // for free list in Server
}

服务注册

通过调用RegisterName函数,我们可以向rpc server的服务映射集合中保存当前服务信息:

// 服务名 , 服务实现类
func RegisterName(name string, rcvr any) error {
	return DefaultServer.RegisterName(name, rcvr)
}

func (server *Server) RegisterName(name string, rcvr any) error {
	return server.register(rcvr, name, true)
}

func (server *Server) register(rcvr any, name string, useName bool) error {
    // 创建一个新的服务信息类
	s := new(service)
	// 反射获取当前服务实现类的类型和值
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	// 保存服务名
	sname := name
	// useName 表示是否使用传入的name作为服务名 , 如果为false , 则采用服务实现类的类型名
	if !useName {
		sname = reflect.Indirect(s.rcvr).Type().Name()
	}
	if sname == "" {
		s := "rpc.Register: no service name for type " + s.typ.String()
		log.Print(s)
		return errors.New(s)
	}
	// 如果采用服务实现类的类型名作为服务名,要确保服务实现类是导出的,对外可见
	if !useName && !token.IsExported(sname) {
		s := "rpc.Register: type " + sname + " is not exported"
		log.Print(s)
		return errors.New(s)
	}
	s.name = sname

	// 构建注册服务方法列表信息
	s.method = suitableMethods(s.typ, logRegisterError)

	if len(s.method) == 0 {
		str := ""

		// To help the user, see if a pointer receiver would work.
		method := suitableMethods(reflect.PointerTo(s.typ), false)
		if len(method) != 0 {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
		} else {
			str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
		}
		log.Print(str)
		return errors.New(str)
	}
    // 判断服务名是否重复
	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		return errors.New("rpc: service already defined: " + sname)
	}
	return nil
}

suitableMethods方法用于遍历当前服务实现类所有导出方法,并筛选出符合RPC调用格式的方法列表:

func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {
	methods := make(map[string]*methodType)
	// 遍历当前服务实现类的所有方法
	for m := 0; m < typ.NumMethod(); m++ {
	    // 定位方法元数据对象
		method := typ.Method(m)
		// 获取方法类型和方法名
		mtype := method.Type
		mname := method.Name
		// 跳过未导出的方法
		if !method.IsExported() {
			continue
		}
		// Method needs three ins: receiver, *args, *reply.
		// 方法参数必须有两个,第一个用于作为请求参数,第二个用于接收请求结果
		if mtype.NumIn() != 3 {
			if logErr {
				log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
			}
			continue
		}
		// First arg need not be a pointer.
		// 第一个参数可以不是指针类型
		argType := mtype.In(1)
		if !isExportedOrBuiltinType(argType) {
			if logErr {
				log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
			}
			continue
		}
		// Second arg must be a pointer.
		// 第二个参数必须是指针类型
		replyType := mtype.In(2)
		if replyType.Kind() != reflect.Pointer {
			if logErr {
				log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
			}
			continue
		}
		// Reply type must be exported.
		// 第二个参数类型必须是导出的
		if !isExportedOrBuiltinType(replyType) {
			if logErr {
				log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
			}
			continue
		}
		// 方法必须只有一个返回值,同时返回值类型必须是error类型
		// Method needs one out.
		if mtype.NumOut() != 1 {
			if logErr {
				log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
			}
			continue
		}
		// The return type of the method must be error.
		if returnType := mtype.Out(0); returnType != typeOfError {
			if logErr {
				log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
			}
			continue
		}
		// 构造方法类型信息: 方法元数据本身,方法第一个入参类型,方法第二个入参类型
		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
	}
	return methods
}

请求处理

本文一开始给出的Demo是借助 Http Server 来 Accept 用户连接,当接收到用户连接后,会通过Hijack获取到原生连接,然后后续该连接上的客户端读写事件都采用gob编码进行通信,而非http协议了:

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	buf := bufio.NewWriter(conn)
	// 构建gob编码器
	srv := &gobServerCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
	// 使用gob编码器从连接到读取字节流,然后按照golang RPC协议执行反序列化
	server.ServeCodec(srv)
}

ServeCodec 函数会按照gob编码反序列化得到RPC请求头和请求数据,然后调用目标,最终将结果按gob编码执行序列化,写会connection中:

func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
	    // 解析得到请求数据
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		if err != nil {
			if debugLog && err != io.EOF {
				log.Println("rpc:", err)
			}
			// 读取完所有请求后,退出循环
			if !keepReading {
				break
			}
			// send a response if we actually managed to read a header.
			if req != nil {
				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
				server.freeRequest(req)
			}
			continue
		}
		wg.Add(1)
		// 处理请求调用
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
	// We've seen that there are no more requests.
	// Wait for responses to be sent before closing codec.
	// 等待所有响应被处理完毕
	wg.Wait()
	codec.Close()
}

golang rpc 调用,发出的请求数据由两部分组成,首先是请求头,其次是RPC函数入参数的第一个对象,同样也是按照这个顺序依次执行反序列化读取:

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
    // 解析请求头
	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
	if err != nil {
		if !keepReading {
			return
		}
		// discard body
		codec.ReadRequestBody(nil)
		return
	}
     
	// Decode the argument value.
	argIsValue := false 
	// 如果rpc方法的第一个参数(请求参数)类型是指针,则解引用拿到原始类型
	// 然后以原始类型分配一块新的内存,返回指向该内存的指针
	if mtype.ArgType.Kind() == reflect.Pointer {
		argv = reflect.New(mtype.ArgType.Elem())
	} else {
		argv = reflect.New(mtype.ArgType)
		argIsValue = true
	}
	// 反序列化得到请求参数的具体值,设置到argv指向到的零值结构体中
	if err = codec.ReadRequestBody(argv.Interface()); err != nil {
		return
	}
	// 如果目标RPC方法的请求入参是值类型,则进行解引用
	if argIsValue {
		argv = argv.Elem()
	}
    
    // 为第二个参数(返回值参数)同样初始化零值
	replyv = reflect.New(mtype.ReplyType.Elem())
   // 如果返回值参数类型为Map或者Slice,则初始化空map或切片
	switch mtype.ReplyType.Elem().Kind() {
	case reflect.Map:
		replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
	case reflect.Slice:
		replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
	}
	return
}

golang rpc 请求头由调用方法信息和请求序列号组成 , 反序列化后,可以拿到服务名和方法名,根据方法名去server的服务映射集合中定位具体的方法元数据对象:

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// 从请求对象池中获取一个空闲的请求对象
	req = server.getRequest()
	// 采用gob编码器将请求头部分字节流反序列化为req对象类型
	err = codec.ReadRequestHeader(req)
	if err != nil {
		req = nil
		// 字节流读完了
		if err == io.EOF || err == io.ErrUnexpectedEOF {
			return
		}
		err = errors.New("rpc: server cannot decode request: " + err.Error())
		return
	}

	// We read the header successfully. If we see an error now,
	// we can still recover and move on to the next request.
	keepReading = true
    // 分割得到服务名和客户端想要调用的方法名 
	dot := strings.LastIndex(req.ServiceMethod, ".")
	if dot < 0 {
		err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
		return
	}
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	// Look up the request.
	// 根据服务名加载对应的服务信息类
	svci, ok := server.serviceMap.Load(serviceName)
	if !ok {
		err = errors.New("rpc: can't find service " + req.ServiceMethod)
		return
	}
	// 拿到服务信息类后,根据方法名定位获取到对应的方法类型
	svc = svci.(*service)
	mtype = svc.method[methodName]
	if mtype == nil {
		err = errors.New("rpc: can't find method " + req.ServiceMethod)
	}
	return
}

反序列化拿到请求数据后,便可以查询服务映射集合拿到对应的方法信息,最后我们便可以借助反射完成方法调用了:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	if wg != nil {
		defer wg.Done()
	}
	mtype.Lock()
	// 当前方法调用次数加一
	mtype.numCalls++
	mtype.Unlock()
	// 拿到方法句柄
	function := mtype.method.Func
	// 传入方法实际调用者,即服务实现类,方法的第一个和第二个请求参数
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	// 方法执行完毕后,拿到方法返回值 -- 代表error
	errInter := returnValues[0].Interface()
	errmsg := ""
	if errInter != nil {
		errmsg = errInter.(error).Error()
	}
	// 发送响应给客户端
	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	// 释放当前请求对象到对象池中
	server.freeRequest(req)
}

本地方法执行完毕后,需要组装响应对象,然后将响应对象执行gob编码,然后发送到连接中:

var invalidRequest = struct{}{}

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply any, codec ServerCodec, errmsg string) {
    // 从响应池中获取到空闲的响应对象
	resp := server.getResponse()
	// Encode the response header
	// 组装响应对象
	resp.ServiceMethod = req.ServiceMethod
	if errmsg != "" {
		resp.Error = errmsg
		reply = invalidRequest
	}
	resp.Seq = req.Seq
	// 将响应对象执行gob编码,然后发送到conn中
	sending.Lock()
	err := codec.WriteResponse(resp, reply)
	if debugLog && err != nil {
		log.Println("rpc: writing response:", err)
	}
	sending.Unlock()
	// 将响应对象返回到对象池中
	server.freeResponse(resp)
}

客户端

数据结构

首先是代表客户端对象的Client结构:

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
	codec ClientCodec  // 请求数据编解码器,默认是gob协议

	reqMutex sync.Mutex // protects following
	request  Request  // 此处请求对象结构复用了/rpc/server包下的请求对象结构

	mutex    sync.Mutex // protects following
	seq      uint64.  // 请求序列号
	pending  map[uint64]*Call // 已经发出但还未回复的rpc调用
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

Call 结构体承载了RPC远程调用的上下文信息

// Call represents an active RPC.
type Call struct {
	ServiceMethod string     // The name of the service and method to call.
	Args          any        // The argument to the function (*struct).
	Reply         any        // The reply from the function (*struct).
	Error         error      // After completion, the error status.
	Done          chan *Call // Receives *Call when Go is complete.
}

建立连接

当服务端采用HTTP协议来接收客户端连接时,客户端就必须通过调用DialHttp来与服务端建立连接:

func DialHTTP(network, address string) (*Client, error) {
    // 使用默认的RPC建立连接的请求路径: /_goRPC_ 
	return DialHTTPPath(network, address, DefaultRPCPath)
}

func DialHTTPPath(network, address, path string) (*Client, error) {
    // 建立TCP连接
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	// 发出connect请求
	io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")

	// Require successful HTTP response
	// before switching to RPC protocol.
	// 再转换为采用RPC协议通信时,需要确保此处的响应是成功的
	resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
	if err == nil && resp.Status == connected {
		return NewClient(conn), nil
	}
	if err == nil {
		err = errors.New("unexpected HTTP response: " + resp.Status)
	}
	conn.Close()
	return nil, &net.OpError{
		Op:   "dial-http",
		Net:  network + " " + address,
		Addr: nil,
		Err:  err,
	}
}

当成功连接服务端时,会创建一个新的客户端对象并返回:

func NewClient(conn io.ReadWriteCloser) *Client {
	encBuf := bufio.NewWriter(conn)
	// client端默认采用gob编码
	client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	return NewClientWithCodec(client)
}

但是在一个新的客户端初始化时,会启动一个永不停歇的协程来不断接收并处理来自服务端的响应数据:

func NewClientWithCodec(codec ClientCodec) *Client {
	client := &Client{
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	// 启动一个永不停歇的协程来不断接收并处理来自服务端的响应数据
	go client.input()
	return client
}

input 协程采用死循环来不断读取服务端响应,并进行处理:

func (client *Client) input() {
	var err error
	var response Response // rpc/server包下的Response对象
	// 死循环来不断接收服务端响应,直到解析请求体的过程中出现错误,才会退出循环
	for err == nil {
		response = Response{} 
		// 读取响应头
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		// 拿到响应序列号,得知该响应是对客户端发出的哪个请求的响应
		seq := response.Seq
		client.mutex.Lock()
		// 从pending集合中定位对应的call对象
		call := client.pending[seq]
		// 从集合中移除该对象
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		// 如果pending集合中不存在call对象,说明可能是重复响应,说明存在错误
		case call == nil:
			// We've got no pending call. That usually means that
			// WriteRequest partially failed, and call was already
			// removed; response is a server telling us about an
			// error reading request body. We should still attempt
			// to read error body, but there's no one to give it to.
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
		// 响应头中错误信息不为空	
		case response.Error != "":
			// We've got an error response. Give this to the request;
			// any subsequent requests will get the ReadResponseBody
			// error if there is one.
			call.Error = ServerError(response.Error)
			err = client.codec.ReadResponseBody(nil)
			if err != nil {
				err = errors.New("reading error body: " + err.Error())
			}
			// 通知本次请求结束
			call.done()
	    // 正常响应 		
		default:
		    // 读取响应结果
			err = client.codec.ReadResponseBody(call.Reply)
			// 存在错误则记录
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			// 通知本次请求处理结束
			call.done()
		}
	}
	// 如果解析请求体的过程中出现错误,则退出上面的循环 
	// Terminate pending calls.
	client.reqMutex.Lock()
	client.mutex.Lock()
	client.shutdown = true
	closing := client.closing
	if err == io.EOF {
		if closing {
			err = ErrShutdown
		} else {
			err = io.ErrUnexpectedEOF
		}
	}
	// 终止所有已发送还未接收到响应的请求
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
	client.mutex.Unlock()
	client.reqMutex.Unlock()
	if debugLog && err != io.EOF && !closing {
		log.Println("rpc: client protocol error:", err)
	}
}

请求调用

rpc client端通过调用Call方法来完成远程过程调用:

func (client *Client) Call(serviceMethod string, args any, reply any) error {
	// 同步阻塞直到请求响应接收到为止,Done信号在input协程中被设置,或者请求发送过程中出现错误时被设置
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
	// 构建请求调用对象
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel. If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	// 发送请求
	client.send(call)
	return call
}

实际请求发送会调用client的send方法完成:

func (client *Client) send(call *Call) {
	client.reqMutex.Lock()
	defer client.reqMutex.Unlock()

	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		client.mutex.Unlock()
		call.Error = ErrShutdown
		call.done()
		return
	}
	// 为当前请求设置请求序列号,同时将当前请求调用添加进pending集合
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	// Encode and send the request.
	// 构建请求对象
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	// 发送请求 --- 此处发送完毕请求后,就直接返回了,不会等待响应结果
	err := client.codec.WriteRequest(&client.request, call.Args)
	if err != nil {
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

延伸

异步调用

上文中举的例子,客户端实际是同步调用模式,首先WriteRequest发送请求方法是异步的,但是Call方法会等待直到Done信号有值时,才会返回。

改造为异步模式也很简单,直接调用Go方法,并在合适的时机调用监听Done通道是否有值即可:

func NewClient() *common.Response {
	client, err := rpc.DialHTTP("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing: ", err)
	}

	res := &common.Response{}

	call := client.Go("helloService.Hello", &common.Request{
		map[string]interface{}{
			"client": "val1",
		}, map[string]interface{}{
			"data": "hello world",
		},
	}, res, nil)

	ticker := time.NewTicker(time.Millisecond)
	defer ticker.Stop()

	select {
	case replyCall := <-call.Done:
		if err := replyCall.Error; err != nil {
			fmt.Println("rpc error:", err)
		} else {
			fmt.Printf("res= %v", replyCall)
		}
	case t := <-ticker.C:
		fmt.Println("Current time: ", t)
	}
	
	return res
}

定制服务名

默认情况下,rpc.Register()将方法接收者(receiver)的类型名作为服务名。我们也可以自己设置。这时需要调用RegisterName(name string, rcvr interface{}) error方法,我们一开始给出的例子就是采用了后者,忘记的可以回看源码。


采用TPC协议建立连接

上面我们都是使用 HTTP 协议来实现 rpc 服务的,rpc库也支持直接使用 TCP 协议。首先,服务端先调用net.Listen("tcp", ":1234")创建一个监听某个 TCP 端口的监听器(Accepter),然后使用rpc.Accept(l)在此监听器上接受连接并处理:

type HelloServiceImpl int

func NewServer() {
	helloImpl := new(HelloServiceImpl)
	l, err := net.Listen("tcp", ":1236")
	if err != nil {
		return
	}

	rpc.Register(helloImpl)
	rpc.Accept(l)
}

func (s *HelloServiceImpl) Hello(request *common.Request, response *common.Response) error {
	response.Header = request.Header
	response.Params = map[string]interface{}{
		"data": "Hello World",
	}
	return nil
}

此处就相当于建立连接的时候就不采用http的connect请求方式了,只要TCP连接建立成功,就认为RPC连接建立成功:

func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

func (server *Server) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Print("rpc.Serve: accept:", err.Error())
			return
		}
		go server.ServeConn(conn)
	}
}

然后,客户端调用rpc.Dial()以 TCP 协议连接到服务端:

func NewClient() *common.Response {
	client, err := rpc.Dial("tcp", ":1236")
	if err != nil {
		log.Fatal("dialing: ", err)
	}

	res := &common.Response{}

	call := client.Go("helloService.Hello", &common.Request{
		map[string]interface{}{
			"client": "val1",
		}, map[string]interface{}{
			"data": "hello world",
		},
	}, res, nil)

	ticker := time.NewTicker(time.Millisecond)
	defer ticker.Stop()

	select {
	case replyCall := <-call.Done:
		if err := replyCall.Error; err != nil {
			fmt.Println("rpc error:", err)
		} else {
			fmt.Printf("res= %v", replyCall)
		}
	case t := <-ticker.C:
		fmt.Println("Current time: ", t)
	}

	return res
}

相比于基于Http协议建立连接的方式,此处就直接建立TCP连接就完事了,而无需再发送Connect请求:

// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	return NewClient(conn), nil
}

自定义编码格式

默认客户端与服务端之间的数据使用gob编码,我们可以使用其它的格式来编码。在服务端,我们要实现rpc.ServerCodec接口:

// src/net/rpc/server.go
type ServerCodec interface {
  ReadRequestHeader(*Request) error
  ReadRequestBody(interface{}) error
  WriteResponse(*Response, interface{}) error

  Close() error
}

实际上不用这么麻烦,我们查看源码看看gobServerCodec是怎么实现的,然后仿造实现一个就行了。下面我实现了一个 JSON 格式的编解码器:

type JsonServerCodec struct {
  rwc    io.ReadWriteCloser
  dec    *json.Decoder
  enc    *json.Encoder
  encBuf *bufio.Writer
  closed bool
}

func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec {
  buf := bufio.NewWriter(conn)
  return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false}
}

func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
  return c.dec.Decode(r)
}

func (c *JsonServerCodec) ReadRequestBody(body interface{}) error {
  return c.dec.Decode(body)
}

func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
  if err = c.enc.Encode(r); err != nil {
    if c.encBuf.Flush() == nil {
      log.Println("rpc: json error encoding response:", err)
      c.Close()
    }
    return
  }
  if err = c.enc.Encode(body); err != nil {
    if c.encBuf.Flush() == nil {
      log.Println("rpc: json error encoding body:", err)
      c.Close()
    }
    return
  }
  return c.encBuf.Flush()
}

func (c *JsonServerCodec) Close() error {
  if c.closed {
    return nil
  }
  c.closed = true
  return c.rwc.Close()
}

server端的for循环中需要创建编解码器JsonServerCodec传给ServeCodec方法:

func NewServer() {
	helloImpl := new(HelloServiceImpl)
	l, err := net.Listen("tcp", ":1236")
	if err != nil {
		return
	}

	rpc.Register(helloImpl)

	for {
		conn, err := l.Accept()
		if err != nil {
			return
		}
		go rpc.ServeCodec(common.NewJsonServerCodec(conn))
	}
}

同样的,客户端要实现rpc.ClientCodec接口,也是仿造gobClientCodec的实现:

type JsonClientCodec struct {
  rwc    io.ReadWriteCloser
  dec    *json.Decoder
  enc    *json.Encoder
  encBuf *bufio.Writer
}

func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec {
  encBuf := bufio.NewWriter(conn)
  return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf}
}

func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
  if err = c.enc.Encode(r); err != nil {
    return
  }
  if err = c.enc.Encode(body); err != nil {
    return
  }
  return c.encBuf.Flush()
}

func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error {
  return c.dec.Decode(r)
}

func (c *JsonClientCodec) ReadResponseBody(body interface{}) error {
  return c.dec.Decode(body)
}

func (c *JsonClientCodec) Close() error {
  return c.rwc.Close()
}

要使用NewClientWithCodec以指定的编解码器创建客户端:

func NewClient() *common.Response {
	conn, err := net.Dial("tcp", ":1234")
	if err != nil {
		return nil
	}

	client := rpc.NewClientWithCodec(common.NewJsonClientCodec(conn))
	res := &common.Response{}

	err = client.Call("helloService.Hello", &common.Request{
		map[string]interface{}{
			"client": "val1",
		}, map[string]interface{}{
			"data": "hello world",
		},
	}, res)

	return res
}

自定义服务器

实际上,上面我们调用的方法rpc.Register,rpc.RegisterName,rpc.ServeConn,rpc.ServeCodec都是转而去调用默认DefaultServer的相关方法:

// src/net/rpc/server.go
var DefaultServer = NewServer()

func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

func RegisterName(name string, rcvr interface{}) error {
  return DefaultServer.RegisterName(name, rcvr)
}

func ServeConn(conn io.ReadWriteCloser) {
  DefaultServer.ServeConn(conn)
}

func ServeCodec(codec ServerCodec) {
  DefaultServer.ServeCodec(codec)
}

但是因为DefaultServer是全局共享的,如果有第三方库使用了相关方法,并且注册了一些对象的方法,我们引用这个第三方库之后,就出现两个问题。第一,可能与我们注册的方法冲突;第二,带来额外的安全隐患(库中方法直接panic?)。故而推荐做法是自己NewServer:

func main() {
  arith := new(Arith)
  server := rpc.NewServer()
  server.RegisterName("math", arith)
  server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)

  if err := http.ListenAndServe(":1234", nil); err != nil {
    log.Fatal("serve error:", err)
  }
}

这其实是一个套路,很多库会提供一个默认的实现直接使用,如log、net/http这些库。但是也提供了创建和自定义的方法。一般测试时为了方便可以使用默认实现,实践中最好自己创建相应的对象,避免干扰和安全问题。


参考

延伸部分主要摘录至: Go 每日一库之 rpc

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/225501.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

忘记PDF密码了,怎么办?

PDF文件有两种密码&#xff0c;一个打开密码、一个限制编辑密码&#xff0c;因为PDF文件设置了密码&#xff0c;那么打开、编辑PDF文件就会受到限制。忘记了PDF密码该如何解密&#xff1f; PDF和office一样&#xff0c;可以对文件进行加密&#xff0c;但是没有提供恢复密码的功…

动态代理IP和静态代理IP有什么区别,适用场景是什么?

互联网行业的从业者经常会用到一种工具&#xff0c;那就是代理IP工具。动态代理IP和静态代理IP是两种常见的代理IP技术&#xff0c;它们在网络通信中起到了重要的作用&#xff0c;比如大数据行业的从业者会经常需要用到动态代理IP&#xff0c;跨境行业的从业者会经常用到静态代…

MySQL 忘记root密码后重置密码操作

在忘记 MySQL 密码的情况下&#xff0c;可以通过 --skip-grant-tables 关闭服务器的认证&#xff0c;然后重置 root 的密码&#xff0c;具体操作步骤如下。 步骤 1)&#xff1a;关闭正在运行的 MySQL 服务。打开 cmd 进入 MySQL 的 bin 目录。 步骤 2)&#xff1a;输入mysqld -…

Constraining Async Clock Domain Crossing

Constraining Async Clock Domain Crossing 我们在normal STA中只会去check 同步clock之间的timing,但是design中往往会存在很多CDC paths,这些paths需要被正确约束才能保证design function正确,那么怎么去约束这些CDC paths呢? 以下面的design为例,如下图所示 这里clk…

基于Linux的网络防火墙设计方法

摘要 随着Internet的迅速发展&#xff0c;网络越来越成为了人们日常生活不可或缺的一部分&#xff0c;而随之引出的网络安全问题也越来越突出&#xff0c;成为人们不得不关注的问题。 为了在一个不安全的网际环境中构造出一个相对安全的环境&#xff0c;保证子网环境下的计算机…

lorenz相图

观察Lorenz在各个不同维度上的相图。 lorenz_demo(50) function xdot g(t,x) xdot zeros(3,1); sig 10.0; rho 28.0; bet 8.0/3.0; xdot(1) sig*(x(2)-x(1)); xdot(2) rho*x(1)-x(2)-x(1)*x(3); xdot(3) x(1)*x(2)-bet*x(3); endfunction lorenz_demo(time) [t,x] ode…

系统思考与啤酒游戏经营沙盘

结束一家汽车零配件公司《系统思考与啤酒游戏经营沙盘》的内训课&#xff0c;4个小组基本上都有共同的心智模式&#xff0c;这也代表团队有一些集体的盲点。不仅仅对啤酒游戏经营沙盘做了复盘&#xff0c;同时也借用学员画出的系统环路图完成真实案例的研讨以及团队共识&#x…

JVM 运行时内存篇

面试题&#xff1a; 讲一下为什么JVM要分为堆、方法区等&#xff1f;原理是什么&#xff1f;&#xff08;UC、智联&#xff09; JVM的分区了解吗&#xff0c;内存溢出发生在哪个位置 &#xff08;亚信、BOSS&#xff09; 简述各个版本内存区域的变化&#xff1…

<习题集><LeetCode><链表><2/19/21/23/24>

目录 2. 两数相加 19. 删除链表的倒数第 N 个结点 21. 合并两个有序链表 23. 合并 K 个升序链表 24. 两两交换链表中的节点 2. 两数相加 https://leetcode.cn/problems/add-two-numbers/ public ListNode addTwoNumbers(ListNode l1, ListNode l2) {//head是cur链表头节点…

Jenkins中文官网地址

Jenkins中文官网地址&#xff1a; Jenkins 用户手册Jenkins 是一个开源自动化服务器https://www.jenkins.io/zh/doc/

RCS(限制立方样条图)ggrcs包

致敬天桥下的卖艺者&#xff0c;零基础说科研&#xff0c;写的ggrcs包新的2.6版本已经在CRAN上线&#xff0c;增加了多元线性回归分析模块&#xff0c;现在支持逻辑回归&#xff08;logistic回归&#xff09;、cox回归和多元线性回归 需要的可以使用代码安装 install.packages(…

MATLAB将动画演示以及将过程保存为gif动态图片

平时想要做一个动画图片来演示&#xff0c;本人一般有两种方法&#xff1a; 一种是截很多张图之后&#xff0c;将这些图片合成为一张gif动画&#xff1b; 另一种就是录屏再制作成gif&#xff0c;我一般是录下视频之后&#xff0c;使用QQ影音&#xff0c;里面的影音工具箱有一个…

轻松掌握接口测试!丰富你的测试技能!

接口测试是测试系统组件间接口的一种测试。接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及系统间的相互逻辑依赖关系等。 简答的说就是通过URL像服务器或者其他模块等&…

【EI征稿中|ACM出版】2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023)

2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023&#xff09; 2023 International Conference on Artificial Intelligence, Systems and Network Security 由西南科技大学计算机科学与技术学院主办的2023人工智能、系统与网络安全国际学术会议 (AISNS 2023&#xff…

离线数仓构建案例一

数据采集 日志数据&#xff08;文件&#xff09;到Kafka 自己写个程序模拟一些用户的行为数据&#xff0c;这些数据存在一个文件夹中。 接着使用flume监控采集这些文件&#xff0c;然后发送给kafka中待消费。 1、flume采集配置文件 监控文件将数据发给kafka的flume配置文件…

系统架构设计面试题

十大经典系统架构设计面试题_架构_程序员石磊_InfoQ写作社区翻译自&#xff1a;https://medium.com/geekculture/top-10-system-design-interview-questions-10f7b5ea123d在我作为微软和Facebhttps://xie.infoq.cn/article/4c0c9328a725a76922f6547ad 任何 SDI 问题的提示 通过…

【PyTorch】softmax回归

文章目录 1. 模型与代码实现1.1. 模型1.2. 代码实现 2. Q&A2.1. 运行过程中出现以下警告&#xff1a;2.2. 定义的神经网络中的nn.Flatten()的作用是什么&#xff1f;2.3. num_workers有什么作用&#xff1f;它的值怎么确定&#xff1f; 1. 模型与代码实现 1.1. 模型 背景…

用C语言实现链栈的基本操作

#include <stdio.h> #include <malloc.h> #define ElemType char//相当于ElemType等同于char类型 //链式结构 数据域指针域 typedef struct LinkStackNode//定义一个链栈的结构体类型 {ElemType data;//ElemType是链栈的元素类型&#xff0c;代表数据域struct Lin…

Let‘s Encrypt安全证书的步骤及使用-基于centos9, 包括工具certbot安装及使用,获取apache、nginx、iis等服务器安全证书

Lets Encrypt 介绍 Lets Encrypt 是一个免费、自动化的证书颁发机构&#xff08;CA&#xff0c;Certificate Authority&#xff09;&#xff0c;致力于为网站提供免费的SSL/TLS证书。以下是关于Lets Encrypt安全证书的详细介绍&#xff1a; 特点和背景&#xff1a; 免费&#…

zedbox 实现配置 yolov5

Stereolabs 的 ZED Box 是一款由 NVIDIA Jetson™ 提供支持的紧凑型网关&#xff0c;用于在移动和现场情况下挑战 AIoT&#xff08;物联网&#xff09;。它旨在将空间感知和理解带到边缘&#xff0c;并聚合、处理和分析来自 3D 传感器和设备的数据。 相关内容 1.win10下 cud…