一. OpenSSL
1.1 介绍
OpenSSL是一个开放源代码的软件库包,用于支持网络通讯过程中的加密。这个库提供的功能包含了SSL和TLS协议的实现,并可用于生成密钥、证书、进行密码运算等。
其组成主要包括一下三个组件:
-
openssl:多用途的命令行工具
-
libcrypto:加密算法库
-
libssl:加密模块应用库,实现了ssl及tls
openssl可以实现秘钥证书管理、对称加密和非对称加密 。
官网:[ Downloads ] - /source/index.html
1.2 Windows安装方法
OpenSSL官网没有提供windows版本的安装包,可以选择其它开源平台提供的工具。
Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions
以该工具为例:
进入下载界面,选择下载的版本,下载完,之后安装即可。
1.3 生成公钥和私钥
openssl命令详解-CSDN博客
生成私钥:openssl genrsa -out rsa_private_key.pem 1024
生成公钥:openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem
二. gRPC认证
gRPC默认内置了两种认证方式:
- SSL/TLS认证
- 基于Token的认证
同时,gRPC提供了接口用于扩展自定义认证方式。
2.1 TLS认证
2.1.1 什么是TLS认证
TLS(Transport Layer Security,安全传输层),TLS是建立在传输层TCP协议之上的协议,服务于应用层,它的前身是SSL(Secure Socket Layer,安全套接字层),它实现了将应用层的报文进行加密后再交由TCP进行传输的功能。
2.1.2 TLS的作用
TLS协议主要解决如下三个网络安全问题。
- 保密(message privacy),保密通过加密encryption实现,所有信息都加密传输,第三方无法嗅探;
- 完整性(message integrity),通过MAC校验机制,一旦被篡改,通信双方会立刻发现;
- 认证(mutual authentication),双方认证,双方都可以配备证书,防止身份被冒充;
2.1.3 TLS认证实例
- 证书制作
制作公钥:自签名公钥(x509), 制作私钥。
#生成一个名为server_private.key的RSA私钥,使用SHA256算法和4096位密钥长度。然后使用该私钥生成一个有效期为36500天的自签名证书,并将其保存为名为server.pem的文件。同时在证书中添加subjectAltName扩展,指定DNS名称为www.wy.com。
openssl req -newkey rsa:4096 -nodes -sha256 -keyout server_private.key -x509 -days 36500 -out server.pem -addext "subjectAltName =DNS:www.wy.com"
- openssl req:生成自签名证书
- -newkey rsa:4096 :生成新的4096位rsa密钥对
- -sha256:使用sha256加密
- -keyout:指定生成的私钥文件
- -x509:指输出证书
- -days 36500:有效期 36500
- -out:输出证书的文件名
- -addext:添加扩展
注意需要在证书中添加subjectAltName扩展,指定DNS名称。不然在客户端连接服务器时会报错,报错信息为:
rpc error: code = Unavailable desc = connection error: desc = "transport: authentication handshake failed: tls: failed to verify certificate: x509: certificate relies on legacy Common Name field, use SANs instead"
因为
go1.15
版本开始废弃CommonName
,因此推荐使用SAN
证书。如果想兼容之前的方式,需要设置环境变量GODEBUG
为x509ignoreCN=0
。(创建 SSL/TLS 证书时,证书依赖于传统的 Common Name (CN) 字段,而没有使用现代标准所推荐的 Subject Alternative Names (SANs) 字段。现代的 TLS 客户端(比如最新版本的浏览器和安全工具)要求证书使用 SANs 字段来指定有效的主机名。)
自定义信息:
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:CN #国家
State or Province Name (full name) [Some-State]:SHANGHAI #省份
Locality Name (eg, city) []:SHANGHAI #城市
Organization Name (eg, company) [Internet Widgits Pty Ltd]:BF #公司
Organizational Unit Name (eg, section) []:Dev #部门
Common Name (e.g. server FQDN or YOUR name) []:www.wy.com #服务器名称
Email Address []:xxx@xxx.com #邮箱地址
- 目录结构
- 示例代码
服务端代码:
package main
import (
"context"
"fmt"
"net"
hello "sample-app/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials" //引入gRPC认证包
)
const (
//服务器地址
Addr = "127.0.0.1:8080"
)
type helloService struct{}
//定义hello 服务
var HelloService = helloService{}
//实现proto hello service方法
func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
resp := new(hello.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s", req.Name)
return resp, nil
}
func main() {
ls, err := net.Listen("tcp", Addr)
if err != nil {
fmt.Println(err)
return
}
//TLS认证
cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
if err != nil {
fmt.Println(err)
return
}
//新建一个grpc服务器,并开启TLS认证
//上面监听并没有进行连接客户端
server := grpc.NewServer(grpc.Creds(cert))
//注册HelloService
hello.RegisterHelloServer(server, HelloService)
fmt.Println("Listen on" + Addr + "with TLS")
//这里面才会连接客户端,需要进行认证
server.Serve(ls)
}
-
credentials.NewServerTLSFromFile
:从输入证书文件和密钥文件为服务端构造TLS凭证 -
grpc.Creds
:返回一个ServerOption,用于设置服务器连接的凭证。
客户端代码:
package main
import (
"context"
"fmt"
hello "sample-app/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (、
//gRPC服务器地址
Addr = "127.0.0.1:8080"
)
func main() {
//TLS连接
cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
if err != nil {
fmt.Println("credentials fail ", err)
return
}
//请求连接的时候 需要认证
conn, err := grpc.Dial(Addr, grpc.WithTransportCredentials(cert))
if err != nil {
fmt.Println("Dial fail", err)
return
}
defer conn.Close()
c := hello.NewHelloClient(conn)
req := new(hello.HelloRequest)
req.Name = "gRPC"
resp, err := c.SayHello(context.Background(), req)
if err != nil {
fmt.Println("say hello fail", err)
return
}
fmt.Println(resp.Message)
}
-
credentials.NewClientTLSFromFile
:从输入的证书文件中为客户端构造TLS凭证。 -
grpc.WithTransportCredentials
:配置连接级别的安全凭证(例如,TLS/SSL),返回一个DialOption,用于连接服务器。
proto文件:
syntax="proto3";
package hello;
option go_package="hello";
service Hello
{
rpc SayHello(HelloRequest)returns(HelloResponse){};
}
message HelloRequest
{
string name = 1;
}
message HelloResponse
{
string message = 1;
}
使用下面命令生成pb.go文件:
protoc --go_out=plugins=grpc:"生成pb.go文件地址" -I="proto文件地址" "proto文件地址\文件"
演示:
实际TLS认证不是这样,客户端和服务器时分离的。客户端有证书(包含公钥),服务端有证书和私钥。
客户端发送请求给服务器请求连接,服务器将证书通过私钥加密后发送给客户端。客户端有证书,里面包含服务器私钥对应的公钥。使用公钥对数据进行解密,获得证书数据,与本地证书数据进行比较。
2.2 Token认证
继续扩展上面的代码,实现TLS+Token认证机制。
2.2.1 什么是Token认证
Token认证是一种基于Token的身份验证方法,用于在客户端和服务器之间进行身份验证。以下是Token认证的主要概念、流程以及优缺点:
-
主要概念
- Token的含义:Token(令牌)是服务端生成的一串字符串,作为客户端进行请求的一个标识。
- Token的组成:一般包括用户身份标识(uid)、时间戳(time)和签名(sign)等元素。
- Token的作用:Token主要用于身份验证、授权、会话管理和跨域资源共享(CORS)等方面。
-
认证流程
- 用户登录并获取Token:用户使用用户名和密码登录,成功后服务端生成Token并发送给客户端。
- 客户端存储和使用Token:客户端将Token保存在本地(如cookie或localStorage),并在后续请求中携带该Token。
- 服务端验证Token:服务端收到请求后,验证Token的合法性,若合法则处理请求并返回数据。
2.2.2 示例代码
根据上面的代码,实现TLS+Token认证机制。
- 认证原理
客户端发送请求,会将Token放到context.Context上下文中,服务器收到请求,从上下文中获取Token验证,然后进行下一步操作。
- 目录结构
- 客户端代码
grpc/credential包内默认定义了PerRPCCredentials接口,是提供用于自定义接口,他的作用是将所需安全认证信息添加到每个RPC上下文中。其包含两个方法。
type PerRPCCredentials interface {
//获取当前请求认证所需的元数据
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
//是否需要基于TLS认证进行安全传输
RequireTransportSecurity() bool
}
package main
import (
"context"
"fmt"
hello "sample-app/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
Addr = "127.0.0.1:8080"
//是否使用TLS
OpenTLS = true
)
// 自定义认证
type Token struct {
Appid string
Appkey string
}
// 实现自定义认证方法
func (t Token) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": t.Appid,
"appkey": t.Appkey,
}, nil
}
// 实现自定义认证是否开启TLS
func (t Token) RequireTransportSecurity() bool {
return OpenTLS
}
func main() {
//TLS连接
var opt []grpc.DialOption
if OpenTLS {
cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
if err != nil {
fmt.Println("credentials fail ", err)
return
}
opt = append(opt, grpc.WithTransportCredentials(cert))
} else {
opt = append(opt, grpc.WithInsecure())
}
//使用自定义认证
tk := Token{
Appid: "101010",
Appkey: "i am a key",
}
opt = append(opt, grpc.WithPerRPCCredentials(&tk))
conn, err := grpc.Dial(Addr, opt...)
if err != nil {
fmt.Println("Dial fail", err)
return
}
defer conn.Close()
//初始化服务器
c := hello.NewHelloClient(conn)
req := new(hello.HelloRequest)
req.Name = "gRPC"
resp, err := c.SayHello(context.Background(), req)
if err != nil {
fmt.Println("say hello fail", err)
return
}
fmt.Println(resp.Message)
}
定义一个结构Token,包含Token所需属性字段。实现PerRPCCredentials接口的两个方法。每次调用token信息会通过请求metadata传输到服务端。
下面查看服务端如何获取metadata中信息。
- 服务端代码
使用metadata.FromIncomingContext:从上下文中获取元数据。
package main
import (
"context"
"fmt"
"net"
hello "sample-app/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" //引入gRPC认证包
"google.golang.org/grpc/metadata"
)
var Addr = "127.0.0.1:8080"
type helloService struct{}
var HelloService = helloService{}
func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
//认证
md, ok := metadata.FromIncomingContext(c)
if !ok {
return nil, grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
}
var appid string
var appkey string
vals := md.Get("appid")
if len(vals) != 0 {
appid = vals[0]
}
val_key := md.Get("appkey")
if len(val_key) != 0 {
appkey = val_key[0]
}
//认证token
if appid != "101010" || appkey != "i am a key" {
return nil, grpc.Errorf(codes.Unauthenticated, "Token认证信息错误: Appid:%s, Appkey:%s", appid, appkey)
}
//fmt.Println("authenticated succ " + appid + "-" + appkey)
resp := new(hello.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s \nToken info: Appid=%s, AppKey=%s", req.Name, appid, appkey)
return resp, nil
}
func main() {
ls, err := net.Listen("tcp", Addr)
if err != nil {
fmt.Println(err)
return
}
//TLS认证
cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
if err != nil {
fmt.Println(err)
return
}
//新建一个grpc服务器,并开启TLS认证
server := grpc.NewServer(grpc.Creds(cert))
//注册HelloService
hello.RegisterHelloServer(server, HelloService)
fmt.Println("Listen on " + Addr + " with TLS")
server.Serve(ls)
}
- 演示
成功:
失败:
补充:
google.golang.org/grpc/credentials/oauth 包已实现了用于 Google API 的 oauth 和 jwt 验证的方法,使用方法可以参考 官方文档 。在实际应用中,我们可以根据自己的业务需求实现合适的验证方 式。
三. 拦截器
grpc服务端和客户端都提供了interceptor功能,可以在请求前后处理一些通用逻辑,比如:记录日志,tracing,身份认证等。
在上面自定义Token认证的示例中,认证信息是由每个服务中的方法处理并认证,如果有大量的接口,这种姿势就不优雅了,每一个接口实现都需要先处理认证信息。这个时候interceptor就可以用来解决这个问题,在请求被转到具体接口之前处理认证信息,一处认证到处无忧。在客户端,我们增加一个请求日志,记录请求相关的参数和耗时等。
3.1 grpc的interceptor
gRPC服务端和客户端均可实现各自的拦截器,根据rpc的两种请求方式可分为两种:
- Unary Interceptor(一元拦截器)
- Stream Interceptor(流式拦截器)
rpc的两种请求方式:
- 一元请求(Unary):客户端发送一个请求给服务端,然后立即得到一个响应,但是服务器端并不能主动向客户端发送消息。
- 流式请求:当数据量比较大,可能会对响应时间产生影响,可以使用流式来分批次来传输数据,不需要一次性就处理完数据。
- 服务端流式请求(server streaming):客户端发送一个请求给服务端,然后服务端就会向客户端逐渐返回一系列的消息。
- 客户端流式请求(client streaming):客户端就像一个流,连续发送多个消息给服务端,然后等待服务端的一个响应。
- 双向流式请求(Bidirectional streaming):客户端和服务端可以连续互发消息。
对应Protobuf文件service定义:
下面是一个例子:
一元请求:request和response不需要stream。
服务端流式请求:request不需要stream,response需要stream
客户端流式请求:request需要stream,response不需要stream。
双向流式请求:request和response都需要stream。
syntax = "proto3"; option go_package = "github.com/lixd/grpc-go-example/features/proto/echo"; package echo; // Echo 服务,包含了4种类型API service Echo { // UnaryAPI rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // SServerStreaming rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingE rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreaming rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} } message EchoRequest { string message = 1; } message EchoResponse { string message = 1; }
3.2 一元拦截器
对于一元服务器拦截器,只需要定义UnaryServerInterceptor方法即可。其中,handler(ctx, req)即调用rpc方法。
type UnaryServerInterceptor func(
ctx context.Context, //rpc上下文
req interface{}, //rpc请求参数
info *UnaryServerInfo, //rpc方法信息
handler UnaryHandler, //rpc方法本身,真正执行逻辑
)(interface{}, error){
return handler(ctx, req)
}
对于一元客户端拦截器,一样需要定义一个方法UnaryClientInterceptor,其中invoker()才真正请求rpc。
type UnaryClientInterceptor func(
ctx context.Context, //rpc上下文
method string, //调用方法名
req, //rpc请求参数
reply interface{}, //rpc响应结果
cc *ClientConn, //连接句柄
invoker UnaryInvoker, //调用rpc本身方法
opts ...CallOption //调用配置
) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
一元拦截器的实现,根据handle和invoker的前后:调用前预处理,调用rpc方法,调用后处理。
- 服务端代码
package main
import (
"context"
"fmt"
"net"
hello "sample-app/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" //引入gRPC认证包
"google.golang.org/grpc/metadata"
)
var Addr = "127.0.0.1:8080"
type helloService struct{}
var HelloService = helloService{}
func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
//fmt.Println("authenticated succ " + appid + "-" + appkey)
resp := new(hello.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s", req.Name)
return resp, nil
}
func main() {
ls, err := net.Listen("tcp", Addr)
if err != nil {
fmt.Println(err)
return
}
//TLS认证
cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
if err != nil {
fmt.Println(err)
return
}
var opts []grpc.ServerOption
//开启TLS认证
opts = append(opts, grpc.Creds(cert))
//注册拦截器
opts = append(opts, grpc.UnaryInterceptor(UnaryServerInterceptor))
//server := grpc.NewServer(grpc.Creds(cert))
server := grpc.NewServer(opts...)
//注册HelloService
hello.RegisterHelloServer(server, HelloService)
fmt.Println("Listen on " + Addr + " with TLS " + "Interceptor")
server.Serve(ls)
}
func auth(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
}
var appid string
var appkey string
vals := md.Get("appid")
if len(vals) != 0 {
appid = vals[0]
}
val_key := md.Get("appkey")
if len(val_key) != 0 {
appkey = val_key[0]
}
//认证token
if appid != "101010" || appkey != "i am a key" {
return grpc.Errorf(codes.Unauthenticated, "Token认证信息错误: Appid:%s, Appkey:%s", appid, appkey)
}
return nil
}
func UnaryServerInterceptor(ctx context.Context, req interface{}, into *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
err = auth(ctx)
if err != nil {
return nil, err
}
return handler(ctx, req)
}
- 客户端代码
package main
import (
"context"
"fmt"
"log"
hello "sample-app/grpc/proto"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
Addr = "127.0.0.1:8080"
//是否使用TLS
OpenTLS = true
)
// 自定义认证
type Token struct {
Appid string
Appkey string
}
// 实现自定义认证方法
func (t Token) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": t.Appid,
"appkey": t.Appkey,
}, nil
}
// 实现自定义认证是否开启TLS
func (t Token) RequireTransportSecurity() bool {
return OpenTLS
}
func main() {
//TLS连接
var opt []grpc.DialOption
if OpenTLS {
cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
if err != nil {
fmt.Println("credentials fail ", err)
return
}
opt = append(opt, grpc.WithTransportCredentials(cert))
} else {
opt = append(opt, grpc.WithInsecure())
}
//使用自定义认证
tk := Token{
Appid: "101010",
Appkey: "i am not a key",
}
opt = append(opt, grpc.WithPerRPCCredentials(&tk))
//加入拦截器
opt = append(opt, grpc.WithUnaryInterceptor(UnaryClientInterceptor))
conn, err := grpc.Dial(Addr, opt...)
if err != nil {
fmt.Println("Dial fail", err)
return
}
defer conn.Close()
//初始化服务器
c := hello.NewHelloClient(conn)
req := new(hello.HelloRequest)
req.Name = "gRPC"
resp, err := c.SayHello(context.Background(), req)
if err != nil {
fmt.Println("say hello fail", err)
return
}
fmt.Println(resp.Message)
}
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
stat_time := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("method=%s, req=%v, reply=%v, duration=%s, error=%v\n", method, req, reply, time.Since(stat_time), err)
return err
}
- 运行结果
认证成功:
认证失败:
3.3 流式拦截器
流式拦截器的实现与一元拦截器一致,实现提供的方法即可,方法参数含义如下:
type StreamServerInterceptor func(
srv interface{}, //rpc请求参数
ss ServerStream, //服务端stream对象
info *StreamServerInfo, //rpc方法信息
handler StreamHandler //rpc方法本身,真正执行逻辑
) (err error) {
return handler(src, ss)
}
type StreamClientInterceptor func(
ctx context.Context,//rpc上下文
desc *StreamDesc, //流信息
cc *ClientConn, //连接句柄
method string, //调用方法名
streamer Streamer, //调用rpc方法本身
opts ...CallOption //调用配置
) (ClientStream, error) {
//流操作预处理
clientStream, err := streamer(ctx, desc, cc, method, opts...)
//根据某些条件,通过clientStream拦截流操作
return clientStream, err
}
与其他拦截器不同,客户端流式拦截器的实现分为两个部分,流操作预处理和流操作拦截,其不能在事后进行rpc方法调用,只能通过ClientStream对象进行操作拦截。即需要进行rpc方法调用后,才能进行操作拦截。例如根据特定的metadata,调用ClientStream.CloseSend()终止流操作。
下面实现了一个打印请求和响应日志的拦截器,只是函数签名变成了grpc.StreamServerInterceptor 和 grpc.StreamClientInterceptor 。
- proto文件
syntax="proto3";
package hello;
option go_package="hello";
service Hello
{
rpc SayHello(HelloRequest)returns(HelloResponse){};
//双向流式请求
rpc Streaming(stream StreamRequest) returns (s StreamResponse) {}
}
message HelloRequest
{
string name = 1;
}
message HelloResponse
{
string message = 1;
}
message StreamRequest
{
string input = 1;
}
message StreamResponse
{
string output = 1;
}
使用下面命令生成go语言文件:
protoc --go_out=plugins=grpc:.\grpc\proto -I=.\grpc\proto .\grpc\proto\hello.proto
- 服务端代码
服务端定义结构体,实现了proto文件HelloServer接口,即实现了SayHello和Streaming方法。
实现了拦截器方法StreamServerInterceptor(函数名随便定义)。
服务端的实现其实和一元拦截器的使用方式没什么太大区别,但是流的特性在于请求和响应不是一次性完成的,而是多次发送和接收数据。所以我们可能需要在发送和接收数据的过程中处理一些公共逻辑。这才是拦截器特别的地方。
我们注意到handler方法调用的第二个参数grpc.ServerStream接口类型,这个类型包含了SendMsg和RecvMsg方法,所以我们可以使用一个自定义类型实现这个接口,并重写这两个方法,我们就可以实现打印日志的目的。
package main
import (
"context"
"fmt"
"io"
"log"
"net"
hello "sample-app/grpc/proto"
"strconv"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" //引入gRPC认证包
"google.golang.org/grpc/metadata"
)
var Addr = "127.0.0.1:8080"
type helloService struct{}
var HelloService = helloService{}
func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
resp := new(hello.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s", req.Name)
return resp, nil
}
func (h helloService) Streaming(stream hello.Hello_StreamingServer) error {
for n := 0; ; {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
v, _ := strconv.Atoi(res.Input)
log.Printf("[server streaming] recv : %d\n", v)
n += v
log.Printf("[server streaming] send : %d\n", n)
stream.Send(&hello.StreamResponse{Output: strconv.Itoa(n)})
}
}
func main() {
ls, err := net.Listen("tcp", Addr)
if err != nil {
fmt.Println(err)
return
}
//TLS认证
cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
if err != nil {
fmt.Println(err)
return
}
var opts []grpc.ServerOption
//开启TLS认证
opts = append(opts, grpc.Creds(cert))
//注册拦截器
opts = append(opts, grpc.StreamInterceptor(StreamServerInterceptor))
//server := grpc.NewServer(grpc.Creds(cert))
server := grpc.NewServer(opts...)
//注册HelloService
hello.RegisterHelloServer(server, HelloService)
fmt.Println("Listen on " + Addr + " with TLS " + "with StreamInterceptor")
server.Serve(ls)
}
// 里面定义grpc.ServerStream接口类型的属性
// 是为了重写SendMsg和RecvMsg方法
type serverStream struct {
//需要使用匿名字段
//内嵌方法重写
grpc.ServerStream
}
// 重写ServerStream的SendMsg方法
func (s serverStream) SendMsg(m interface{}) error {
//发送数据前处理
log.Printf("[server SendMsg]: send : %T\n", m)
return s.ServerStream.SendMsg(m)
}
// 重写ServerStream的RecvMsg方法
func (s serverStream) RecvMsg(m interface{}) error {
//接收数据前处理
log.Printf("[server Recv Stream]: recv : %T\n", m)
return s.ServerStream.RecvMsg(m)
}
// 流式拦截器
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
//前置逻辑
log.Printf("[StreamServerInterceptor] accept request : %s\n", info.FullMethod)
arg := serverStream{ss}
err := handler(srv, arg)
return err
}
- 客户端代码
客户端代码和服务端代码类似,只是对应数据处理的接口变成了grpc.ClientStream。
package main
import (
"context"
"fmt"
"io"
"log"
hello "sample-app/grpc/proto"
"strconv"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
Addr = "127.0.0.1:8080"
//是否使用TLS
OpenTLS = true
)
func main() {
//TLS连接
var opt []grpc.DialOption
if OpenTLS {
cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
if err != nil {
fmt.Println("credentials fail ", err)
return
}
opt = append(opt, grpc.WithTransportCredentials(cert))
} else {
opt = append(opt, grpc.WithInsecure())
}
//加入流式拦截器
opt = append(opt, grpc.WithStreamInterceptor(StreamClientInterceptor))
conn, err := grpc.Dial(Addr, opt...)
if err != nil {
fmt.Println("Dial fail", err)
return
}
defer conn.Close()
//初始化服务器
c := hello.NewHelloClient(conn)
//单项请求
req := new(hello.HelloRequest)
req.Name = "gRPC"
resp, err := c.SayHello(context.Background(), req)
if err != nil {
fmt.Println("say hello fail", err)
return
}
fmt.Println(resp.Message)
//流式发送数据
Streaming(c)
}
type clientStream struct {
grpc.ClientStream
}
func (c clientStream) SendMsg(m interface{}) error {
log.Printf("[client SendMsg] send : %T\n", m)
return c.ClientStream.SendMsg(m)
}
func (c clientStream) RecvMsg(m interface{}) error {
log.Printf("[client RecvMsg] recv : %T\n", m)
return c.ClientStream.RecvMsg(m)
}
// 拦截器方法
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
//前置逻辑
log.Printf("[StreamClientInterceptor] send req : %s\n", method)
//请求
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return clientStream{s}, nil
}
// 流式发送
func Streaming(pb hello.HelloClient) error {
stream, err := pb.Streaming(context.Background())
if err != nil {
return err
}
for n := 0; n < 5; n++ {
log.Printf("[client Streaming] send : %d\n", n)
err = stream.Send(&hello.StreamRequest{Input: strconv.Itoa(n)})
if err != nil {
return err
}
resp, err := stream.Recv()
//发送完毕,退出
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("[client Streaming] recv : %s\n", resp.Output)
}
//停止发送
stream.CloseSend()
return nil
}
- 演示
注意点:
- server和client的recv和send互成一对,最后一次输出recv是结束消息,err==io.EOF
- 在自定义的RecvMsg方法中,前置位置只能读取消息的类型,而无法读取消息的实际数据,因为这个时候收到的消息还没有解析,如果需要接收消息的实际数据,需要把自定义的处理逻辑放后面。
func (s serverStream) RecvMsg(m interface{}) error {
err := s.ServerStream.RecvMsg(m)
log.Printf("[server Recv Stream]: recv : %T\n", m)
return err
}