go语言 grpc 拦截器

文章目录

    • 拦截器
      • 服务端拦截器
        • 一元拦截器
        • 流拦截器
      • 客户端拦截器
        • 一元拦截器
        • 流拦截`
      • 多个拦截器
  • 代码仓库

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

在这里插入图片描述

一元拦截器
package main

import (
	"context"
	"flag"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "mygrpc/proto/hello" // 引入编译生成的包
)

const (
	defaultName = "world"
)

var (
	addr = flag.String("addr", "localhost:50051", "the address to connect to")
	name = flag.String("name", defaultName, "Name to greet")
)

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"

流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
	log.Printf("Recved %v", req.GetName())
	// 具体返回多少个response根据业务逻辑调整
	for i := 0; i < 10; i++ {
		// 通过 send 方法不断推送数据
		err := stream.Send(&pb.HelloReply{})
		if err != nil {
			log.Fatalf("Send error:%v", err)
			return err
		}
	}
	return nil
}

type wrappedStream struct {
	// 包装器流
	grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.SendMsg(m)
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedStream{s}
}

func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前置处理
	log.Println("====== [Server Stream Interceptor] ", info.FullMethod)

	// 包装器流调用 流RPC
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Printf("RPC failed with error %v", err)
	}
	return err
}
func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 开启rpc
	s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
	// 注册服务
	pb.RegisterGreeterServer(s, &server{})
	log.Printf("service listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)


客户端拦截器

在这里插入图片描述

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// 前置处理逻辑
	log.Println("Method : " + method)

	// 调用invoker 执行远程方法
	err := invoker(ctx, method, req, reply, cc, opts...)

	// 后置处理逻辑
	log.Println(reply)
	return err
}

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {
	grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	
  // 前置处理逻辑
	log.Println("======= [Client Interceptor] ", method)
  
  // 调用streamer 来获取客户端流
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return newWrappedStream(s), nil
}

func main(){
   // 注册拦截器到客户端流
 conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
                     
	c := pb.NewOrderManagementClient(conn)
  ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
  
  // 调用客户端流RPC方法
  searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
	for {
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}
		if err == nil {
			log.Print("Search Result : ", searchOrder)
		}
	}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {
	interceptors := cc.dopts.chainUnaryInts
	if cc.dopts.unaryInt != nil {
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
	}
	var chainedInt UnaryClientInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
		}
	}
	cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
	n := len(interceptors)
	if n > 1 {
		lastI := n - 1
		return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
			var (
				chainHandler grpc.UnaryInvoker
				curI         int
			)

			chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
				if curI == lastI {
					return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
				}
				curI++
				err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
				curI--
				return err
			}

			return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
		}
	}
    ...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/229931.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CSS——标准流、浮动、Flex布局

1、标准流 标准流也叫文档流&#xff0c;指的是标签在页面中默认的排布规则&#xff0c;例如&#xff1a;块元素独占一行&#xff0c;行内元素可以一行显示多个。 2、浮动 作用&#xff1a;让块元素水平排列 属性名&#xff1a;float 属性值&#xff1a; left&#xff1a;…

ubuntu22.04 安装nvidia GPU显卡驱动

下载 https://www.nvidia.com/Download/index.aspx 删除原有的NVIDIA驱动程序 sudo apt-get remove –purge nvidia*禁用nouveau 默认情况下&#xff0c;Ubuntu采用开源的nouveau驱动程序作为Nvidia显卡的驱动&#xff0c;需要将其禁用 打开编辑配置文件&#xff1a; sudo …

【MySQL】MySQL的varchar字段最大长度是65535?

在MySQL建表sql里,我们经常会有定义字符串类型的需求。 CREATE TABLE `user` ( `name` varchar(100) NOT NULL DEFAULT COMMENT 名字) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ; 比方说user表里的名字,就是个字符串。MySQL里有两个类型比较适合这个场景。 char和varchar。…

Python GUI教程:轻松构建用户界面

大家好&#xff0c;从网络开发到数据科学&#xff0c;Python被广泛应用于各个领域。本文将探索Python内置的用于创建图形用户界面&#xff08;GUI&#xff09;的库&#xff1a;Tkinter。无论是初学者还是经验丰富的开发者&#xff0c;了解如何创建Python GUI都可以增强构建交互…

Python VSCode 配置固定的脚本入口

Python VSCode 配置固定的脚本入口 打开或者新建一个启动配置 选择 .vscode目录下 launch.json文件 将 “program”: “${file}” 替换成 “program”: “mian.py”, //完成你自己的入口.py文件名即可 json启动配置文件 {// Use IntelliSense to learn about possible attrib…

微信小程序引入Vant Weapp修改样式不起作用,使用外部样式类进行覆盖

一、引入Vant Weapp后样式问题 在项目中使用第三方组件修改css样式时,总是出现各种各样问题,修改的css样式不起作用,没有效果,效果不符合预期等。 栗子(引入一个搜索框组件)实现效果: 左侧有一个搜索文字背景为蓝色,接着跟一个搜索框 wxml <view class"container&q…

解决Idea右侧无Maven选项的问题

在创建 Spring / SpringBoot 项目的时候可能会遇到没有 Maven 选项的问题&#xff0c;如下图&#xff1a; 我们通常这样解决&#xff1a;

解决微信小程序中 ‘nbsp;‘ 空格不生效的问题

在微信小程序开发中&#xff0c;我们经常会使用 来表示一个空格。这是因为在 HTML 中&#xff0c;空格会被解析为一个普通字符&#xff0c;而不会产生实际的空白间距。而 是一种特殊的字符实体&#xff0c;它被解析为一个不可见的空格&#xff0c;可以在页面上产生真正的空…

从互联网到云计算再到 AI 原生,百度智能云数据库的演进

1 数据库行业发展概述 如果说今年科技圈什么最火&#xff0c;我估计大家会毫不犹豫选择 ChatGPT。ChatGPT 是 2022 年 11 月 30 日由 OpenAI 发布的聊天应用。它创造了有史以来用户增长最快的纪录&#xff1a;自 11 月 30 日发布起&#xff0c;5 天就拥有了 100 万活跃用户&am…

【微软技术栈】发布自己造的轮子 -- 创建Nuget包(分布操作)

目录 1、您的项目 2、创建 .nuspec 文件 3、一张图片胜过一千个拉取请求 4、包括自述文件 MD 文件 5、构建软件包 6、将包部署到 Nuget.Org 7、手动上传软件包 8、自动化和脚本化部署 9、我们如何构建和部署 ErrLog.IO Nuget 包 10、Nuget统计数据 11、最后的思考 创建 Nuget 包…

Vue3使用Tailwind CSS

安装 Tailwind 以及其它依赖项 npm install -D tailwindcsslatest postcsslatest autoprefixerlatest生成配置文件&#xff1a; npx tailwindcss init -p.修改配置文件 tailwind.config.js 2.6版本 &#xff1a; module.exports {purge: [./index.html, ./src/**/*.{vue,j…

结合ColorUI组件开发微信小程序

1.自定义组件生命周期函数&#xff1a; Component({data: {},attached() {console.log("自定义组件生命周期函数 attached--先执行");this.getPos();},ready() {console.log("ready生命周期函数---在attached之后执行")},methods: {getPos() {var that th…

macOS Big Sur/Mac电脑安装vscode显示您没有权限来打开应用程序‘Visual Studio Code‘ 请联系您的电脑或网络管理员问题修复

错误方法 首先我以为我的权限不足。&#xff0c;需要去用户群组里设置。结果根本不是这个的问题。 1.在系统偏好设置->用户与群组检查了一下我的用户是不是管理员 结果发现是管理员 2.根据苹果提示&#xff0c;右键我的文件夹->显示简介->最下面的共享与权限 解锁&…

【Linux】进程间通信之管道--命名管道匿名管道通信进程池设计

文章目录 一、进程间通信介绍1.什么是通信2.为什么要有通信以及如何进行通信 二、管道1.什么是管道2.匿名管道2.1什么是匿名管道2.2接口认识--pipe2.3进程间通信之匿名管道实现2.4读写特征2.5管道的特征2.6 进程池设计 3.命名管道3.1创建一个命名管道3.2用命名管道实现server&a…

大数据技术3:数据仓库的ETL和分层模型

前言&#xff1a;我们先了解一下数据仓库架构的演变过程。 1 、数据仓库定义 数据仓库是一个面向主题的&#xff08;Subject Oriented&#xff09;、集成的&#xff08;Integrate&#xff09;、相对稳定的&#xff08;Non-Volatile&#xff09;、反映历史变化&#xff08;Time…

3GPP标准查看、下载和几个UE相关系列标准

由于一直做终端侧协议。最近以UE为核心重新下载了一系列文档。 总结并举例一下分类标准。 如何查看3GPP标准列表 实际上在3GPP网站如下链接&#xff1a;Specifications by Series&#xff0c;每个系列以及分类都说的很清楚。 几个系列分类举例 和终端协议层工作比较关系密切…

鸿蒙OS应用开发之最简单的程序

鸿蒙OS应用开发之最简单的程序 前面介绍怎么样安装鸿蒙应用开发的环境&#xff0c;然后试着运行起来&#xff0c;并安装运行的虚拟机&#xff0c;以及对应9.0版本的API和SDK等软件。这样就具备了基本的开发基础&#xff0c;就可以进入创建应用程序开发了。 在我们起飞之前&…

Navicat 技术指引 | 适用于 GaussDB 分布式的日志查询与配置设置

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB 分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结…

c/c++中一些不常用但有用的知识

1.变长数组 bool fun(int cnt) {unsigned char data[cnt];return true; } 在 C 语言中&#xff0c;变长数组&#xff08;Variable Length Arrays&#xff0c;VLA&#xff09;是 C99 标准引入的特性&#xff0c;允许使用变量来定义数组的长度。因此&#xff0c;在 C 版本的代码…

遥测终端机RTU:实现远程监测和控制的重要工具

遥测终端机RTU对设备进行远程监测和控制&#xff0c;支持采集和传输数据&#xff0c;以实现对工业过程、公用事业、水文和环境的监测和管理。 遥测终端机RTU工作原理 计讯物联遥测终端机RTU通过网口、串口进行传感器/设备等现场数据采集&#xff0c;将其转换为数字信号&#xf…