Golang RPC实现-day01

导航

  • Golang RPC实现
    • 一、主体逻辑设计
    • 二、服务设计
      • 1、监听和接收请求
      • 2、处理请求
        • (1)服务结构体定义
        • (2)确认请求方和服务方编解码格式
        • (3)循环读取请求
        • (4)解析请求的内容
        • (5)响应请求
    • 三、读取和发送数据到连接中代码

Golang RPC实现

  • 先来一个最简单的版本,后续更新。
  • RPC也可以说是一种自定义的应用层协议
  • 所以我们需要自定义消息格式,消息包括 请求头和请求体,所以我们定义一个消息结构体
type request struct {
	h            *codec.Header // header of request
	argv, replyv reflect.Value // argv and replyv of request
}
  • 请求头结构体
type Header struct {
	ServiceMethod string // format "Service.Method" 调用的服务和方法
	Seq           uint64 // sequence number chosen by client 连接Id
	Error         string
}
  • 请求的第一条消息用来确定后续消息的格式和编码,这里规定第一条消息是以Json格式编码,对应的结构体如下
type Option struct {
	MagicNumber int        // MagicNumber marks this's a geerpc request
	CodecType   codec.Type // client may choose different Codec to encode body
}
  • 消息格式和编码结构体
var DefaultOption = &Option{
	MagicNumber: MagicNumber,
	CodecType:   codec.GobType,
}

一、主体逻辑设计

func main() {
	log.SetFlags(0)
	addr := make(chan string)
	go startServer(addr)//这是RPC的服务端逻辑

	// in fact, following code is like a simple geerpc client
	conn, _ := net.Dial("tcp", <-addr) //客户端拨号建立链接,每次服务逻辑,都通过conn来确定当前客户端/服务端是在哪一个连接上。

	defer func() { _ = conn.Close() }()

	time.Sleep(time.Second)
	// send options,先发了一个,定义后续数据的编码方式
	_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)//Encode方法就是发送一个Option结构体内容到conn连接中
	cc := codec.NewGobCodec(conn)
	// send request & receive response
	for i := 0; i < 5; i++ {//发五次请求
		h := &codec.Header{//定义每次请求的请求头
			ServiceMethod: "Foo.Sum",
			Seq:           uint64(i),
		}
		_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))//发送请求体和body内容
		_ = cc.ReadHeader(h)//接收服务端的响应的请求头内容,处理相应得按顺序,不能并发接收响应数据

		log.Println("clinet receive response:", h.ServiceMethod)
		var reply string
		_ = cc.ReadBody(&reply)//接收服务端的响应的请求体内容
		log.Println("reply:", reply)//打印
	}
}

二、服务设计

逻辑就是

  1. 监听请求
  2. 循环获取请求,异步处理请求
  3. 获取当前连接的第一条消息,确认后续消息的格式和编码
  4. 获取请求内容
  5. 响应请求

1、监听和接收请求

func startServer(addr chan string) {
	// pick a free port
	l, err := net.Listen("tcp", ":0")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", l.Addr())
	addr <- l.Addr().String()
	geerpc.Accept(l)//服务端接收请求
}

2、处理请求

(1)服务结构体定义
type Server struct{}
// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
(2)确认请求方和服务方编解码格式
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
	for {//不断接收请求
		conn, err := lis.Accept()
		if err != nil {
			log.Println("rpc server: accept error:", err)
			return
		}

		go server.ServeConn(conn)//异步处理请求
	}
}
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	log.Println("服务端处理连接中..... ")
	defer func() { _ = conn.Close() }()
	var opt Option
	if err := json.NewDecoder(conn).Decode(&opt); err != nil {//解析第一个Option,确定后续协议消息的格式
		log.Println("rpc server: options error: ", err)
		return
	}
	if opt.MagicNumber != MagicNumber {//服务方编码方式是否与客户端相同
		log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
		return
	}
	f := codec.NewCodecFuncMap[opt.CodecType]//服务方编码方式是否与客户端相同
	if f == nil {//服务端是否存在客户端对应编码方式
		log.Printf("rpc server: invalid codec type %s", opt.CodecType)
		return
	}
	server.serveCodec(f(conn))//第一个确认包通过后,再发后续消息,通过conn拿到连接信息,保证服务端后续能向conn发送信息
}
(3)循环读取请求
func (server *Server) serveCodec(cc codec.Codec) {
	sending := new(sync.Mutex) // make sure to send a complete response
	wg := new(sync.WaitGroup)  // wait until all request are handled
	for {
		req, err := server.readRequest(cc)//反复从请求方接收请求,这里会把请求头和请求体内容获取到req的结构体中
		if err != nil {
			if req == nil {//直到没有请求过来
				break // it's not possible to recover, so close the connection
			}
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			continue
		}
		wg.Add(1)
		go server.handleRequest(cc, req, sending, wg)//异步处理数据
	}
	wg.Wait()
	_ = cc.Close()
}
(4)解析请求的内容
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
	// TODO, should call registered rpc methods to get the right replyv
	// day 1, just print argv and send a hello message
	defer wg.Done()
	log.Println("handleRequest :", req.h, req.argv.Elem())
	req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
	
	server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
(5)响应请求
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
	sending.Lock()
	defer sending.Unlock()
	//time.Sleep(time.Second)
	if err := cc.Write(h, body); err != nil {
		log.Println("rpc server: write response error:", err)
	}
}

三、读取和发送数据到连接中代码

package codec

import (
	"bufio"
	"encoding/gob"
	"io"
	"log"
)

type GobCodec struct {
	conn io.ReadWriteCloser
	buf  *bufio.Writer
	dec  *gob.Decoder
	enc  *gob.Encoder
}

var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
	buf := bufio.NewWriter(conn)
	return &GobCodec{
		conn: conn,
		buf:  buf,
		dec:  gob.NewDecoder(conn),
		enc:  gob.NewEncoder(buf),
	}
}

func (c *GobCodec) ReadHeader(h *Header) error {
	return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
	defer func() {
		_ = c.buf.Flush()
		if err != nil {
			_ = c.Close()
		}
	}()
	if err = c.enc.Encode(h); err != nil {
		log.Println("rpc: gob error encoding header:", err)
		return
	}
	if err = c.enc.Encode(body); err != nil {
		log.Println("rpc: gob error encoding body:", err)
		return
	}
	return
}

func (c *GobCodec) Close() error {
	return c.conn.Close()
}


欢迎大家关注我的博客在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/626898.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BakedSDF: Meshing Neural SDFs for Real-Time View Synthesis 论文阅读

&#xff08;水一篇博客&#xff09; 项目主页 BakedSDF: Meshing Neural SDFs for Real-Time View Synthesis 作者介绍 是 Mildenhall 和 Barron 参与的工作&#xff08;都是谷歌的&#xff09;&#xff0c;同时一作是 Lipman 的学生&#xff0c;VolSDF 的一作。本文引用…

使用Caché管理工具

Cach通过一个web工具来对其进行系统管理和完成管理任务,该方法的一个好处是不必将Cach安装到用于管理的系统上。目前,通过网络远程管理和控制对站点的访问,这些都比较容易。因为数据及其格式信息都直接来自被管理的系统,因此,这也可以最小化跨版本的兼容问题。 本文将描述…

企业微信hook接口协议,ipad协议http,获取群成员列表简洁版

获取群成员列表简洁版 参数名必选类型说明uuid是String每个实例的唯一标识&#xff0c;根据uuid操作具体企业微信 请求示例 {"uuid":"3240fde0-45e2-48c0-90e8-cb098d0ebe43","roomid":10696052955016166 } 返回示例 {"data": {&q…

K8S内容

K8S介绍 1、故障迁移:当某一个node节点关机或挂掉后&#xff0c;node节点上的服务会自动转移到另一个node节点上&#xff0c;这个过程所有服务不中断。这是docker或普通云主机是不能做到的 2、资源调度:当node节点上的cpu、内存不够用的时候&#xff0c;可以扩充node节点&…

基于SSM的“口腔护理网站”的设计与实现(源码+数据库+文档)

基于SSM的“口腔护理网站”的设计与实现&#xff08;源码数据库文档) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 首页 用户注册页面 医生信息查看模块 口腔护理预约模块 后台首页面…

零基础10 天入门 Web3之第3天

10 天入门 Web3之第3天 什么是以太坊&#xff0c;以太坊能做什么&#xff1f;Web3 是互联网的下一代&#xff0c;它将使人们拥有自己的数据并控制自己的在线体验。Web3 基于区块链技术&#xff0c;该技术为安全、透明和可信的交易提供支持。我准备做一个 10 天的学习计划&…

Anaconda下载安装

看到这篇文章的同学们&#xff0c;说明你们是要下载Anaconda&#xff0c;这篇文章讲的就是下载安装教程。 Anaconda下载网址&#xff1a; Download Now | Anaconda 根据我们需要的系统版本下载&#xff0c;我的电脑是window&#xff0c;所以选择第一个&#xff0c;如下图&am…

苍穹外卖-day01(SpringBoot+SSM的企业级Java项目实战)

苍穹外卖-day01 课程内容 软件开发整体介绍 苍穹外卖项目介绍 开发环境搭建 导入接口文档 Swagger 项目整体效果展示&#xff1a; 管理端-外卖商家使用 用户端-点餐用户使用 当我们完成该项目的学习&#xff0c;可以培养以下能力&#xff1a; 1. 软件开发整体介绍 作为…

Power query与Excel的区别,优势?

Power Query是Microsoft Excel的一个强大数据导入、转换和自动化的插件工具&#xff0c;它在Excel 2010之后的版本中被发布出来&#xff0c;随着时间的发展&#xff0c;功能不断增强。 以下是Power Query的一些优势以及它与Excel传统数据处理方式的区别和一些令人印象深刻的功…

鸿蒙内核源码分析 (TLFS 算法篇) | 图表解读 TLFS 原理

动态分配 本篇开始说一个耳朵听起老茧的概念 动态分配&#xff0c;将分成上下两篇&#xff0c;本篇为上篇&#xff0c;看完能快速理解下篇鸿蒙内核源码对动态内存的具体实现。 鸿蒙内核源码分析(TLFS算法) 结合图表从理论视角说清楚 TLFS 算法鸿蒙内核源码分析(内存池管理) 结…

分析 vs2019 cpp20 规范的 STL 库模板 function ,源码注释并探讨几个问题

&#xff08;1 探讨一&#xff09;第一个尝试弄清的问题是父类模板与子类模板的模板参数的对应关系&#xff0c;如下图&#xff1a; 我们要弄清的问题是创建 function 对象时&#xff0c;传递的模板参数 _Fty , 传递到其父类 _Func_class 中时 &#xff0c;父类的模板参数 _Ret…

面试集中营—rocketmq架构篇

一、基本定义 Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力&#xff0c;同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。 Topic&#xff1a;消息主题&…

多格式兼容的在线原型查看:Axure RP的便捷解决方案

Axure rp不仅可以绘制详细的产品构思&#xff0c;还可以在浏览器中生成html页面&#xff0c;但需要安装插件才能打开。安装Axure后 rpchrome插件后&#xff0c;还需要在扩展程序中选择“允许访问文件网站”&#xff0c;否则无法在Axure中成功选择 rp在线查看原型。听起来很麻烦…

添砖Java之路(其七)——static

目录 static&#xff1a; 1.被类的所有对象所共享(和c有点像) 2.多了一种调用方法&#xff0c;可以通过类名调用 3.随着类的加载而加载&#xff0c;是优先于对象的存在。 工具类&#xff1a; 为什么主类的方法要加static&#xff1a; 理解 public static void main&#…

喜大普奔!VMware Workstation Pro 17.5 官宣免费!

Broadcom 已经正式收购 VMware&#xff0c;【VMware中国】官方公众号已于3月11日更名为【VMware by Broadcom中国】。 13日傍晚&#xff0c;该公众号发表推文 V风拂面&#xff0c;好久不见 - 来自VMware 中国的问候 &#xff0c;意味着 VMware 带着惊喜和美好的愿景再次归来。 …

​​​【收录 Hello 算法】6.2 哈希冲突

目录 6.2 哈希冲突 6.2.1 链式地址 6.2.2 开放寻址 1. 线性探测 2. 平方探测 3. 多次哈希 6.2.3 编程语言的选择 6.2 哈希冲突 上一节提到&#xff0c;通常情况下哈希函数的输入空间远大于输出空间&#xff0c;因此理论上哈希冲突是不可避免的。比如&a…

基于GWO灰狼优化的CNN-GRU-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络&#xff08;CNN&#xff09;在时间序列中的应用 4.2 GRU网络 4.3 注意力机制&#xff08;Attention&#xff09; 4.4 GWO优化 5.算法完整程序工程 1.算法运行效果图预览…

S32K144 EB 和 MCAL 安装

首先安装 EB Design : Product Information : Automotive SW - AUTOSAR MCAL / QM (flexnetoperations.com) 参考 NXP_AUTOSAR_MCAL开发环境搭建引导_S32K14x系列_nxp的s32k144 sdk文档-CSDN博客 然后安装 MCAL 需要把 P1 的 Plugins 和 AUTOSAR\S32K14X_MCAL4_3_RTM_1_0_0\S…

暴雨分布式存储集群助重庆高校打造智慧校园

教育是国家发展的基石&#xff0c;教育兴则国家兴&#xff0c;教育强则国家强。党的二十大报告指出&#xff0c;“加快建设教育强国”&#xff0c;并提出到2035年“建成教育强国”的总体目标。随着数字时代的到来&#xff0c;以物联网、大数据、云计算和人工智能为代表的数字技…

【C语言】4.C语言数组(2)

文章目录 6. 二维数组的创建6.1 ⼆维数组的概念6.2 ⼆维数组的创建 7. 二维数组的初始化7.1 不完全初始化7.2 完全初始化7.3 按照⾏初始化7.4 初始化时省略⾏&#xff0c;但是不能省略列 8. 二维数组的使用8.1 ⼆维数组的下标8.2 ⼆维数组的输⼊和输出 9. 二维数组在内存中的存…