gRPC 双向流(Bidirectional Streaming RPC)的使用方法

gRPC 是一个支持多种语言的高性能 RPC 框架,拥有丰富的 API 来简化服务端和客户端的开发过程。gRPC 支持四种 RPC 类型:Unary RPC、Server Streaming RPC、Client Streaming RPC 和 Bidirectional Streaming RPC。下面是双向流 API 的使用方法。

双向流 API 示例

在双向流中,客户端和服务器之间可以同时发送和接收消息。以下是一个双向流的完整示例,展示了客户端和服务端之间的聊天功能。

1. 定义 Protobuf 文件

首先,定义一个 Protobuf 文件 chat.proto,描述服务和消息结构:

syntax = "proto3";

package chat;

service ChatService {
  // 双向流 RPC
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user = 1;
  string text = 2;
}
2. 编译 Protobuf 文件

使用 protoc 编译器生成 Go 代码:


protoc --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. chat.proto

这会生成 chat.pb.go 文件,其中包含了定义的服务和消息。

3. 实现服务器

创建 server.go 文件,具体实现 ChatService 服务:

package main

import (
	"fmt"
	"io"
	"log"
	"net"

	pb "path/to/your/proto/package"
	"google.golang.org/grpc"
)

type server struct {
	pb.UnimplementedChatServiceServer
}

func (s *server) Chat(stream pb.ChatService_ChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		log.Printf("Received message from %s: %s", in.User, in.Text)

		// Echo message back to client
		err = stream.Send(&pb.ChatMessage{User: "Server", Text: "Echo: " + in.Text})
		if err != nil {
			return err
		}
	}
}

func main() {
	listen, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	pb.RegisterChatServiceServer(s, &server{})

	log.Println("Server is running on port :50051")
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
4. 实现客户端

创建 client.go 文件,用于连接服务器并发送和接收消息:

package main

import (
	"bufio"
	"context"
	"fmt"
	"log"
	"os"
	"time"

	pb "path/to/your/proto/package"
	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)

	stream, err := client.Chat(context.Background())
	if err != nil {
		log.Fatalf("failed to create stream: %v", err)
	}

	waitc := make(chan struct{})

	go func() {
		reader := bufio.NewReader(os.Stdin)
		for {
			fmt.Print("Enter message: ")
			text, _ := reader.ReadString('\n')

			if err := stream.Send(&pb.ChatMessage{User: "Client", Text: text}); err != nil {
				log.Fatalf("failed to send message: %v", err)
				break
			}
		}
		stream.CloseSend()
	}()

	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("failed to receive message: %v", err)
				break
			}
			log.Printf("Received message from %s: %s", in.User, in.Text)
		}
	}()

	<-waitc
}
5. 运行示例

编译并运行服务器和客户端:

  • 编译服务器和客户端:
    go build -o server server.go
    go build -o client client.go
    

  • 启动服务器:
    ./server
    

  • 启动客户端:
    ./client
    
    总结

    在这个例子中,服务器和客户端可以通过双向流(Bidirectional Streaming RPC)方式进行实时通信。客户端通过终端输入消息,并通过 gRPC 发送到服务器,服务器接收到消息后回显给客户端。这个示例展示了 gRPC 强大且灵活的流处理能力,在实际项目中可以根据需求进行扩展和修改。

import cycle not allowedgo list
could not import example.com/user/go-project/grpc/service2 (missing metadata for import of "example.com/user/go-project/grpc/service2")

这个错误提示说您在导入路径中循环引用或缺少某些必要的元数据。通常,这些问题可能是因为错误的导入路径配置、循环依赖或者项目结构问题。

步骤 1:确认 go.mod 和目录结构正确

确保 go.mod 文件在项目根目录中,并且包含正确的模块路径和依赖项。

假设项目结构为:

/home/user/go-project/grpc/
├── go.mod
├── go.sum
├── service1/
│   ├── chat.proto
│   ├── chat.pb.go
│   ├── chat_grpc.pb.go
│   ├── server.go
│   └── client.go
└── service2/
    ├── chat.proto
    ├── chat.pb.go
    ├── chat_grpc.pb.go
    ├── server.go
    └── client.go

确认 go.mod 文件

在 ./grpc 目录下,初始化并正确配置 go.mod 文件:

cd /home/user/go-project/grpc
go mod init example.com/user/go-project/grpc

更新 go.mod 文件以包含必要的依赖,确保内容类似如下:

module example.com/user/go-project/grpc

go 1.17

require (
    google.golang.org/grpc v1.39.0
    google.golang.org/protobuf v1.26.0
)

运行以下命令来下载和更新所有依赖:

go mod tidy

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/923765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ffmpeg视频滤镜:替换部分帧-freezeframes

滤镜描述 freezeframes 官网地址 > FFmpeg Filters Documentation 这个滤镜接收两个输入&#xff0c;然后会将第一个视频中的部分帧替换为第二个视频的某一帧。 滤镜使用 参数 freezeframes AVOptions:first <int64> ..FV....... set first fra…

解决SpringBoot连接Websocket报:请求路径 404 No static resource websocket.

问题发现 最近在工作中用到了WebSocket进行前后端的消息通信&#xff0c;后端代码编写完后&#xff0c;测试一下是否连接成功&#xff0c;发现报No static resource websocket.&#xff0c;看这个错貌似将接口变成了静态资源来访问了&#xff0c;第一时间觉得是端点没有注册成…

Leetcode322.零钱兑换(HOT100)

链接 代码&#xff1a; class Solution { public:int coinChange(vector<int>& coins, int amount) {vector<int> dp(amount1,amount1);//要兑换amount元硬币&#xff0c;我们就算是全选择1元的硬币&#xff0c;也不过是amount个&#xff0c;所以初始化amoun…

网络安全期末复习

第1章 网络安全概括 &#xff08;1&#xff09;用户模式切换到系统配置模式&#xff08;enable&#xff09;。 &#xff08;2&#xff09;显示当前位置的设置信息&#xff0c;很方便了解系统设置&#xff08;show running-config&#xff09;。 &#xff08;3&#xff09;显…

鸿蒙进阶篇-自定义组件

大家好&#xff0c;我是鸿蒙开天组&#xff0c;今天咱们来学习自定义组件。 一、自定义组件定义 在ArkUI中&#xff0c;UI显示的内容均为组件&#xff0c;由框架直接提供的称为系统组件&#xff0c;由开发者定义的称为自定义组件。在进行 UI 界面开发时&#xff0c;通常不是简…

深入浅出 WebSocket:构建实时数据大屏的高级实践

简介 请参考下方&#xff0c;学习入门操作 基于 Flask 和 Socket.IO 的 WebSocket 实时数据更新实现 在当今数字化时代&#xff0c;实时性是衡量互联网应用的重要指标之一。无论是股票交易、在线游戏&#xff0c;还是实时监控大屏&#xff0c;WebSocket 已成为实现高效、双向…

一键AI换脸软件,支持表情控制,唇形同步Facefusion-3.0.0发布!支持N卡和CPU,一键启动包

嗨,小伙伴们!还记得小编之前介绍的FaceFusion 2.6.1吗?今天给大家带来超级exciting的消息 —— FaceFusion 3.0.0闪亮登场啦! &#x1f31f; 3.0.0版本更新 &#x1f3d7;️ 全面重构:修复了不少小虫子,运行更稳定,再也不怕突然罢工啦! &#x1f600; Live Portrait功能:新增…

spring boot框架漏洞复现

spring - java开源框架有五种 Spring MVC、SpringBoot、SpringFramework、SpringSecurity、SpringCloud spring boot版本 版本1: 直接就在根下 / 版本2:根下的必须目录 /actuator/ 端口:9093 spring boot搭建 1:直接下载源码打包 2:运行编译好的jar包:actuator-testb…

hhdb数据库介绍(10-8)

首页 管理平台通过数据可视方式在首页功能中实时展示计算节点集群的数据量、访问流量、集群组件状态、告警事件、安全防控等用户关心的信息。 集群安全 邮件通知&#xff1a;根据通知设置中监控开关是否打开判断&#xff0c;分为&#xff1a;全部开启、未开启、部分开启&…

Vue前端开发-slot传参

slot 又称插槽&#xff0c;它是在子组件中为父组件提供的一个占位符&#xff0c;使用来表示&#xff0c;通过这个占位符&#xff0c;父组件可以向中填充任意的内容代码&#xff0c;这些代码将自动替换占位符的位置&#xff0c;从而轻松实现在父组件中控制子组件内容的需求。 作…

18:(标准库)DMA二:DMA+串口收发数据

DMA串口收发数据 1、DMA串口发送数据2、DMA中断串口接收定长数据包3、串口空闲中断DMA接收不定长数据包 1、DMA串口发送数据 当串口的波特率大于115200时&#xff0c;可以通过DMA1进行数据搬运&#xff0c;以防止数据的丢失。如上图所示&#xff1a;UART1的Tx发送请求使用DMA1的…

2024 java大厂面试复习总结(一)(持续更新)

10年java程序员&#xff0c;2024年正好35岁&#xff0c;2024年11月公司裁员&#xff0c;记录自己找工作时候复习的一些要点。 java基础 hashCode()与equals()的相关规定 如果两个对象相等&#xff0c;则hashcode一定也是相同的两个对象相等&#xff0c;对两个对象分别调用eq…

深度学习5

一、模型保存与加载 1、序列化方式 保存方式&#xff1a;torch.save(model, "model.pkl") 打开方式&#xff1a;model torch.load("model.pkl", map_location"cpu") ​ import torch import torch.nn as nnclass MyModle(nn.Module):def __ini…

Redis五大基本类型——Zset有序集合命令详解(命令用法详解+思维导图详解)

目录 一、Zset有序集合类型介绍 二、常见命令 1、ZADD 2、ZCARD 3、ZCOUNT 4、ZRANGE 5、ZREVRANGE 6、ZRANGEBYSCORE 7、ZREVRANGEBYSCORE 8、ZPOPMAX 9、ZPOPMIN 10、ZRANK 11、ZREVRANK 12、ZSCORE 13、ZREM 14、ZREMRANGEBYRANK 15、ZREMRANGEBYSCORE 16…

ARM架构 AArch64 基础知识介绍

介绍 aarch64是 ARM 架构的 64 位版本&#xff0c;它是 ARMv8 架构的一部分&#xff0c;被设计用来提供更高的性能和更大的地址空间&#xff0c;同时保持与 32 位 ARM 架构的兼容性。AArch64 是 ARMv8 的 64 位指令集架构&#xff08;ISA&#xff09;&#xff0c;它提供了丰富的…

Rust中Tracing 应用指南

欢迎来到这篇全面的Rust跟踪入门指南。Rust 的tracing是一个用于应用程序级别的诊断和调试的库。它提供了一种结构化的、异步感知的方式来记录日志和跟踪事件。与传统的日志记录相比&#xff0c;tracing能够更好地处理复杂的异步系统和分布式系统中的事件跟踪&#xff0c;帮助开…

极狐GitLab 17.6 正式发布几十项与 DevSecOps 相关的功能【三】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…

WinFrom调用webapi接口另一个方法及其应用实例

1.调用接口方法 代码如下&#xff1a; public class WebAPI{#region WebAPI调用 public async Task<string> Call_Webapi(string Url, string Json) //url传入的是接口名称&#xff0c;json传入的是接口参数{string responseBody string.Empty; //responseBod…

elasticsearch的索引模版使用方法

5 索引模版⭐️⭐️⭐️⭐️⭐️ 索引模板就是创建索引时要遵循的模板规则索引模板仅对新创建的索引有效&#xff0c;已经创建的索引并不受索引模板的影响 5.1 索引模版的基本使用 1.查看所有的索引模板 GET 10.0.0.91:9200/_index_template2.创建自定义索引模板 xixi &…

从零开始学GeoServer源码(二)添加支持arcgis切片功能

文章目录 参考文章环境背景1、配置打包好的程序1.1、下载GeoServer的war包1.2、下载GeoWebCache1.3、拷贝jar包1.4、修改配置文件1.4.1、拷贝geowebcache-arcgiscache-context.xml1.4.2、修改geowebcache-core-context.xml1.4.3、修改geowebcache-servlet.xml 1.5、配置切片信息…