Golang RPC实现-day02

导航

  • Golang RPC实现
    • 一、客户端异步并发多个请求
      • 1、 客户端结构体
      • 2、 一个客户端,异步发送多个请求,使用`call`结构体代表客户端的每次请求
      • 3、客户端并发多个请求
      • 4、客户端接收请求

Golang RPC实现

  • day01 我们实现了简单的服务端客户端
  • 我们简单总结一下day01的模式。
  • 服务端按顺序处理客户端过来的请求,按顺序响应客户端的请求。
  • 客户端同步的方式发送请求,不能并发发出请求。
  • 那么我们day02干的事情就是,让客户端异步并发的发出请求(请求顺序变得随机),服务端依然是按请求顺序进行处理,处理完某一个请求就返回,可以不按请求的顺序响应数据,但是响应数据是要上锁的,否则会发生响应数据并发安全问题。
  • 主要逻辑是修改了客户端的代码,服务端和day01没有变化

一、客户端异步并发多个请求

1、 客户端结构体

type Client struct {
	cc       codec.Codec//编码方式
	opt      *Option//发出请求的第一个包,用来协商后续包的格式和编码方式
	sending  sync.Mutex // 当一个请求正在发送时,不可以转头去执行别的请求
	header   codec.Header // 请求头内容
	mu       sync.Mutex // protect following
	seq      uint64 //记录该客户端一次请求连接的序号,
	pending  map[uint64]*Call//通过seq快速找到客户端的某个请求
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

2、 一个客户端,异步发送多个请求,使用call结构体代表客户端的每次请求

type Call struct {
	Seq           uint64	//当前请求的序号,唯一标识一个请求
	ServiceMethod string      // format "<service>.<method>" 此次请求的服务和方法
	Args          interface{} // arguments to the function 请求函数的参数
	Reply         interface{} // reply from the function 服务端函数的响应数据
	Error         error       // if error occurs, it will be set //发生错误时的信息
	Done          chan *Call  // Strobes when call is complete.完成一次请求通过chan来通知
}

3、客户端并发多个请求

  • 主函数逻辑
func main() {
	log.SetFlags(0)
	addr := make(chan string)
	go startServer(addr)
	client, _ := geerpc.Dial("tcp", <-addr)
	defer func() { _ = client.Close() }()

	time.Sleep(time.Second)
	// send request & receive response
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {//go 实现异步非阻塞发送多个请求
			defer wg.Done()
			args := fmt.Sprintf("geerpc req %d", i)//一次请求携带的数据
			var reply string
			if err := client.Call("Foo.Sum", args, &reply); err != nil {//call发出一次请求,&reply,传的是引用,如果有响应,就能接收到
				log.Fatal("call Foo.Sum error:", err)
			}
			log.Println("reply:", reply)
		}(i)
	}
	wg.Wait()
}
  • Call 准备发出一次请求
// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done//阻塞等待此次请求的channel,直到服务端处理并响应才返回
	return call.Error
}
  • 绑定数据到请求中
// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10)
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")
	}
	call := &Call{
		ServiceMethod: serviceMethod,//此次请求的服务和方法
		Args:          args,//此次请求的参数
		Reply:         reply,//此处是引用类型,暂时还没有数据,等服务端响应就有数据了
		Done:          done,//绑定此次请求的响应channel,服务端响应后就往对应的channel发一条数据
	}
	client.send(call)
	return call
}
  • 发送请求到服务端
func (client *Client) send(call *Call) {
	// make sure that the client will send a complete request
	client.sending.Lock()
	defer client.sending.Unlock()

	// register this call.
	seq, err := client.registerCall(call)//注册这次call,把这次的请求ID注册到客户端中。。。
	if err != nil {
		call.Error = err
		call.done()
		return
	}

	// prepare request header
	client.header.ServiceMethod = call.ServiceMethod
	client.header.Seq = seq
	client.header.Error = ""

	// encode and send the request
	if err := client.cc.Write(&client.header, call.Args); err != nil {//发送请求头和请求参数
		call := client.removeCall(seq)
		// call may be nil, it usually means that Write partially failed,
		// client has received the response and handled
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

4、客户端接收请求

func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.cc.ReadHeader(&h); err != nil { 接收请求是一个个来,当连接关闭时,此处会报错,退出整个客户端
			break
		}
		call := client.removeCall(h.Seq)//通过Seq唯一标识符删除一个请求
		switch {
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)

		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()//向通道发送一条消息,客户端等待的这个call可以推出了
		}
	}
	// error occurs, so terminateCalls pending calls
	client.terminateCalls(err)//关闭所有请求
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/630215.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Pencils Protocol Season 2 收官在即,展望Season 3 及其权益

此前 Scroll 生态 LaunchPad &聚合收益平台 Pencils Protocol&#xff08;原 Penpad&#xff09;&#xff0c;推出了首个资产即其生态代币 PDD 的 Launch&#xff0c;Season 2 活动主要是用户通过质押 ETH 代币、组件战队等方式&#xff0c;来获得 Point 奖励&#xff0c;并…

mysql的explain

explain可以用于select&#xff0c;delete&#xff0c;insert&#xff0c;update的statement。 当explain用于statement时&#xff0c;mysql将会给出其优化器&#xff08;optimizer&#xff09;的执行计划。 通过explain字段生成执行计划表。下面来解析这个执行计划表的每一列…

正则表达式和sed

一、正则表达式 主要用来匹配字符串&#xff08;命令结果&#xff0c;文本内容&#xff09;&#xff0c; 通配符匹配文件&#xff08;而且是已存在的文件&#xff09; 基本正则表达式 扩展正则表达式 1.元字符 . 匹配任意单个字符&#xff0c;可以是一个汉字 […

【企业宣传片】拍摄思维提升,专业影视质感核心揭密,一课搞定

课程下载&#xff1a;【企业宣传片】拍摄-课程网盘链接提取码下载.txt资源-CSDN文库 更多资源下载&#xff1a;关注我。 课程介绍 大量案例分析宣传片拍摄的痛点要点 根据案例告诉你解决方案&#xff0c;讲透概念 改变你对企业宣传片的思维层级与认知 归纳总结对比不同案…

基于SSM的婚恋网站的设计与实现(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的婚恋网站的设计与实现&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spri…

MS41908M替代AN41908

产品简述 MS41908M 是一款用于网络摄像机和监控摄像机的镜头 驱动芯片他可完全替代AN41908。 芯片内置光圈控制功能&#xff1b;通过电压驱动方式以及扭矩纹 波修正技术&#xff0c;实现了噪声微步驱动。 主要特点 电压驱动方式&#xff0c;256 微步驱动电路&#xff08;两通道…

多区域OSPF路由配置

一、基础配置 1.搭建实验拓扑图 2.实验编址 具体如何配置可以看这一篇详细的博文&#xff1a;单区域OSPF实验-CSDN博客 3.分别检查六个路由器的配置&#xff1a; 使用命令display ip interface brief R1的配置 其他大家可以调出来&#xff0c;再与实验拓扑图进行比对&#…

机器人非线性系统反馈线性化与解耦

机器人非线性系统的反馈线性化和解耦是控制理论中的两个重要概念&#xff0c;它们分别用于简化系统分析和设计过程&#xff0c;提高控制系统的性能。 首先&#xff0c;反馈线性化是一种将非线性系统转化为线性系统的技术。在机器人控制中&#xff0c;由于机器人本身是一个强耦…

那些年我与c++的叫板(一)--string类自实现

引子&#xff1a;我们学习了c中的string类&#xff0c;那我们能不能像以前数据结构一样自己实现string类呢&#xff1f;以下是cplusplus下的string类&#xff0c;我们参考参考&#xff01; 废话不多说&#xff0c;直接代码实现&#xff1a;&#xff08;注意函数之间的复用&…

【C语言】5.C语言函数(2)

文章目录 7.嵌套调⽤和链式访问7.1 嵌套调⽤7.2 链式访问 8.函数的声明和定义8.1 单个⽂件8.2 多个⽂件8.3 static 和 extern8.3.1 static 修饰局部变量8.3.2 static 修饰全局变量8.3.3 static 修饰函数 7.嵌套调⽤和链式访问 7.1 嵌套调⽤ 嵌套调用就是函数之间的互相调用。…

1、sql server数据库进行sql注入

靶机取自&#xff1a;墨者sql server 1、判断数据库类型 抓包知sql server&#xff0c;所以注入语句跟MySQL有些区别 2、判断注入点 “http://219.153.49.228:42514/new_list.asp?id2 ”&#xff0c;当id2 and 11时显示正确&#xff0c;id2 and 12时页面报错。 3、确定列…

BERT for Joint Intent Classification and Slot Filling 论文阅读

BERT for Joint Intent Classification and Slot Filling 论文阅读 Abstract1 Introduction2 Related work3 Proposed Approach3.1 BERT3.2 Joint Intent Classification and Slot Filling3.3 Conditional Random Field 4 Experiments and Analysis4.1 Data4.2 Training Detail…

【董晓算法】动态规划之背包DP问题(2024.5.11)

前言&#xff1a; 本系列是学习了董晓老师所讲的知识点做的笔记 董晓算法的个人空间-董晓算法个人主页-哔哩哔哩视频 (bilibili.com) 动态规划系列 【董晓算法】动态规划之线性DP问题-CSDN博客 01背包 步骤&#xff1a; 分析容量j与w[i]的关系&#xff0c;然后分析是否…

FreeRTOS消息队列queue.c文件详解

消息队列的作用 消息队列主要用来传递消息&#xff0c;可以在任务与任务之间、中断与任务之间传递消息。 传递消息是通过复制的形式&#xff0c;发送方发送时需要不断复制&#xff0c;接收方接收时也需要不断复制。虽然会有内存资源的浪费&#xff0c;但是可以保证安全。 假…

为什么descriptor和data分离可以内存高效率

以下为例&#xff0c;需要有前面的6个bytes开始&#xff0c;用来处理数据&#xff0c;一旦这6个bytes有了即可以处理了。 Descriptor和data是同样的&#xff0c;只有descriptor有了&#xff0c;即可以开始处理data了。所以data不需要停留更长时间。

函数递归练习

目录 1.分析下面选择题 2.实现求第n个斐波那契数 3.编写一个函数实现n的k次方&#xff0c;使用递归实现。 4.写一个递归函数DigitSum(n)&#xff0c;输入一个非负整数&#xff0c;返回组成它的数字之和 5.递归方式实现打印一个整数的每一位 6.实现求n的阶乘 1.分析下面选择…

vs2022中添加头文件和声明

总结帖 数组存储 matlab中3维数组–>C中1维数组 数组转置函数 #include <stdio.h>// 转置二维数组 void transpose(int *src, int *dest, int rows, int cols) {for (int i 0; i < rows; i) {for (int j 0; j < cols; j) {dest[j * rows i] src[i * col…

解析C++ 网络输入输出缓冲区Buffer类的设计与实现(muduo库)

网络输入输出缓冲区&#xff08;Buffer&#xff09;是为了优化网络通信性能而设计的。通过将数据存储在缓冲区中&#xff0c;可以减少对网络的频繁访问&#xff0c;提高数据传输效率。缓冲区还可以帮助处理数据流中的突发性和短时延&#xff0c;使得数据的发送和接收更加稳定和…

最新版Ceph( Reef版本)块存储简单对接k8s(上集)

当前ceph 你的ceph集群上执行 1.创建名为k8s-rbd 的存储池 ceph osd pool create k8s-rbd 64 642.初始化 rbd pool init k8s-rbd3 创建k8s访问块设备的认证用户 ceph auth get-or-create client.kubernetes mon profile rbd osd profile rbd poolk8s-rbd部署 ceph-rbd-csi c…

C++map容器关联式容器

Cmap 1. 关联式容器 vector、list、deque、forward_list(C11)等STL容器&#xff0c;其底层为线性序列的数据结构&#xff0c;里面存储的是元素本身&#xff0c;这样的容器被统称为序列式容器。而map、set是一种关联式容器&#xff0c;关联式容器也是用来存储数据的&#xff0…