Rust : windows下protobuf和压缩传输方案

此前dbpystream库是用python开发 web api。今天在rust中试用一下protobuf。

本文关键词:编译器、protobuf、proto文件、序列化、zstd压缩,build。

一、 protobuf编译器下载

具体见相关文章。没有编译器,protobuf无法运行。
windows参见:

https://blog.csdn.net/wowotuo/article/details/139458846?spm=1001.2014.3001.5502。

二、proto文件的准备

proto文件中主要模拟了一个dbpystream中一个get_price函数的输入和输出的格式,输入HistoryBarRequest ,输出HistoryBarResponse。HistoryBarResponse中,有代码名称,日期,开盘价,最高价等。
在格式中,包括了string,TimeStamp,double; 其中repeated就是vec格式。

syntax = "proto3";
package dbdata;
import public "google/protobuf/timestamp.proto";
service DataService {
  rpc query (HistoryBarRequest) returns (HistoryBarRequest) {}
}
service Login{
  rpc auth (Auth) returns (Response) {}
}
message Auth{
   string id =1; 
   string password=2; 
}
message HistoryBarRequest {
  string security  = 1;
  string frequency = 2;
  FieldParam fields     = 3;
  google.protobuf.Timestamp start_date = 4;//收集时间
  google.protobuf.Timestamp end_date = 5;//收集时间
  bool is_fq  =6 ; 
}
message HistoryBarResponse{
  repeated string securitycode = 1;
  repeated google.protobuf.Timestamp  datetime =2;
  repeated double  open = 3;
  repeated double  high = 4;
  repeated double close = 5;
  repeated double low =6;
  repeated double volume=7;
  repeated double amount=8;
  repeated sint64 is_fq = 9;
}

message FieldParam{
  bool is_all = 1;
}

message Response {
  bool status = 1;
  bytes msg   = 2;
  string error = 3;
}

三、toml文件、文件目录结构、build.rs
1、toml文件有

[package]
name = "clap-2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = "0.7.5" # web 服务器
anyhow = "1" # 错误处理
reqwest = { version = "0.12.4", features = ["json"] } # HTTP 客户端
tokio = { version = "1", features = ["full"] } # 异步处理库
prost = "0.12.6"
# Only necessary if using Protobuf well-known types:
prost-types = "0.12.6"
serde = { version = "1", features = ["derive"] } # 序列化/反序列化数据
polars = { version = "0.39.0", features = ["json"]}
chrono = { version = "0.4", features = ["unstable-locales"] }
zstd = "0.13" # 压缩库
[build-dependencies]
prost-build = "0.12.6" # 编译 protobuf

上面polars,chrono,prost-types,prost-build,prost,zstd是关键库,其它暂时可以不看。

2、目录结构
具体如下:

PS D:\my_program\clap-2> tree /F
卷 新加卷 的文件夹 PATH 列表
卷序列号为 D855-8BFE
D:.
│  .gitignore
│  build.rs
│  Cargo.lock
│  Cargo.toml
│  dbdata.proto
│
└─src
    │  main.rs
    │
    └─pb
            dbdata.rs
            mod.rs

可见,在src/目录下,手动创建了一个pb文件夹,存放未来生成的dbdata.proto文件。

3、build.rs
在src同级目录上(如上),创建build.rs,具体如下:

fn main() {
    prost_build::Config::new()
        .out_dir("src/pb")//设置proto输出目录
        .compile_protos(&["dbdata.proto"], &["."])//我们要处理的proto文件
        .unwrap();
} 

运行cargo build,即生成了dbdata.rs,具体内容如下:

// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Auth {
    #[prost(string, tag = "1")]
    pub id: ::prost::alloc::string::String,
    #[prost(string, tag = "2")]
    pub password: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HistoryBarRequest {
    #[prost(string, tag = "1")]
    pub security: ::prost::alloc::string::String,
    #[prost(string, tag = "2")]
    pub frequency: ::prost::alloc::string::String,
    #[prost(message, optional, tag = "3")]
    pub fields: ::core::option::Option<FieldParam>,
    /// 收集时间
    #[prost(message, optional, tag = "4")]
    pub start_date: ::core::option::Option<::prost_types::Timestamp>,
    /// 收集时间
    #[prost(message, optional, tag = "5")]
    pub end_date: ::core::option::Option<::prost_types::Timestamp>,
    #[prost(bool, tag = "6")]
    pub is_fq: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HistoryBarResponse {
    #[prost(string, repeated, tag = "1")]
    pub securitycode: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
    #[prost(message, repeated, tag = "2")]
    pub datetime: ::prost::alloc::vec::Vec<::prost_types::Timestamp>,
    #[prost(double, repeated, tag = "3")]
    pub open: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "4")]
    pub high: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "5")]
    pub close: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "6")]
    pub low: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "7")]
    pub volume: ::prost::alloc::vec::Vec<f64>,
    #[prost(double, repeated, tag = "8")]
    pub amount: ::prost::alloc::vec::Vec<f64>,
    #[prost(sint64, repeated, tag = "9")]
    pub is_fq: ::prost::alloc::vec::Vec<i64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FieldParam {
    #[prost(bool, tag = "1")]
    pub is_all: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Response {
    #[prost(bool, tag = "1")]
    pub status: bool,
    #[prost(bytes = "vec", tag = "2")]
    pub msg: ::prost::alloc::vec::Vec<u8>,
    #[prost(string, tag = "3")]
    pub error: ::prost::alloc::string::String,
}

4、mod.rs
在pb目录下,创建mod.rs:

pub mod dbdata;

四、原始数据、main.rs

1、原始数据准备

这个原始数据的格式,即收到request后,将发送这个数据内容出去。
在这里插入图片描述文件名称是"C:\Users\Desktop\test.csv"。
这里采用了polars来读取csv文件。

2、main.rs

下面的main.rs模拟了收到resquest,发送response的过程。这个过程可以用web框架,如axum,也可以用grpc框架。这部分不是今天的重点。

需要说明的是:在序列化HistoryBarResponse的基础上,并用zstd库进行了压缩打包,进一步减少了二进制对象的大小,有利于网络传输。

use pb::dbdata::{self, HistoryBarResponse,Response};
mod pb;
use prost_types::Timestamp;
use std::time::{Duration, SystemTime};
use polars::prelude::*;
use chrono::{NaiveDate, NaiveDateTime,NaiveTime};
use zstd;
fn main() ->Result<(),PolarsError>{
    let request = dbdata::HistoryBarRequest {
        security: String::from("600036.XSHG"),
        frequency: String::from("1minute"),
        fields: Some(dbdata::FieldParam {is_all:true}),
        start_date: Some(prost_types::Timestamp::from(SystemTime::now()-Duration::from_secs(3600*12*250))),
        end_date:Some(prost_types::Timestamp::from(SystemTime::now())),
        is_fq:true,
    };
    println!("模拟收到request:{:?}",request);
    println!("模拟开始进行相应的数据处理.....");
    let file = r"C:\Users\hongsl\Desktop\test.csv";
    let df: DataFrame = CsvReader::from_path(file)?
            .has_header(true)
            .finish().unwrap();
    println!("starting...");
    println!("df: {:?}",df);
    let res_raw = HistoryBarResponse{
        securitycode : df.column("securitycode")?.str()?.into_no_null_iter().map(|s|String::from(s)).collect(),
        datetime:df.column("date")?.str()?.into_no_null_iter()
        .map(|t| convert(t)).collect(),
        open:df.column("open")?.f64()?.into_no_null_iter().collect(),
        high:df.column("high")?.f64()?.into_no_null_iter().collect(),
        close:df.column("close")?.f64()?.into_no_null_iter().collect(),
        low:df.column("low")?.f64()?.into_no_null_iter().collect(),
        volume:df.column("volume")?.i64()?.into_no_null_iter().map(|v|v as f64).collect(),
        amount:df.column("amount")?.f64()?.into_no_null_iter().collect(),
        is_fq:df.column("is_fq")?.i64()?.into_no_null_iter().collect(),
    };
    //println!("{:?}", res);
    let encoded_raw = prost::Message::encode_to_vec(&res_raw);
    let compression_level = 3;
    // 服务端对序列化对象进行压缩,
    let compressed = zstd::encode_all(&*encoded_raw, compression_level).unwrap();
    // 服务端模拟通过web或grpc发送
    let res = Response{
        status:true,
        msg: compressed,
        error:String::from(""),
    };
    let encoded = prost::Message::encode_to_vec(&res);

    // 模拟客户端接收到web或grpc相应的数据对象
    let decoded_raw =  < pb::dbdata::Response as prost::Message>::decode(&encoded[..]).unwrap();
    
    // 并进行解压,得到Hist
    let decoded_raw: Vec<u8> = zstd::decode_all(decoded_raw.msg.as_slice()).unwrap();
    let decoded  = < pb::dbdata::HistoryBarResponse as prost::Message>::decode(&decoded_raw[..]).unwrap();
    
    println!("模拟发送相应的数据: {:?}", &decoded.securitycode[0]);
    Ok(())
}

fn convert(dt_str:&str) ->Timestamp {
    let naive_date = NaiveDate::parse_from_str(dt_str, "%Y/%m/%d").unwrap();
    let nano_second = NaiveTime::from_hms_milli_opt(0, 0, 0, 0).unwrap();
    let dt: NaiveDateTime = naive_date.and_time(nano_second );
    Timestamp{
        seconds:dt.and_utc().timestamp(),
        nanos:0,
    }
}

运行如下:

模拟收到request:HistoryBarRequest { security: "600036.XSHG", frequency: "1minute", fields: Some(FieldParam { 
is_all: true }), start_date: Some(Timestamp { seconds: 1707035277, nanos: 595181300 }), end_date: Some(Timestamp { seconds: 1717835277, nanos: 595183100 }), is_fq: true }
模拟开始进行相应的数据处理.....
starting...
df: shape: (482, 9)
┌──────────────┬───────────┬────────┬────────┬───┬────────┬────────┬─────────────┬───────┐
│ securitycode ┆ dateopen   ┆ high   ┆ … ┆ close  ┆ volume ┆ amount      ┆ is_fq │
│ ---          ┆ ---       ┆ ---    ┆ ---    ┆   ┆ ---    ┆ ---    ┆ ---         ┆ ---   │
│ str          ┆ str       ┆ f64    ┆ f64    ┆   ┆ f64    ┆ i64    ┆ f64         ┆ i64   │
╞══════════════╪═══════════╪════════╪════════╪═══╪════════╪════════╪═════════════╪═══════╡
│ 600036.XSHG  ┆ 2021/2/3  ┆ 1210.41222.3 ┆ … ┆ 1221.5122341.4943831e7 ┆ 1     │
│ 600037.XSHG  ┆ 2021/2/4  ┆ 1210.51222.4 ┆ … ┆ 1221.6122351.4946276e7 ┆ 1     │
│ 600038.XSHG  ┆ 2021/2/5  ┆ 1210.61222.5 ┆ … ┆ 1221.7122361.4949e7    ┆ 1     │
│ 600039.XSHG  ┆ 2021/2/6  ┆ 1210.71222.6 ┆ … ┆ 1221.8122371.4951e7    ┆ 1     │
│ 600040.XSHG  ┆ 2021/2/7  ┆ 1210.81222.7 ┆ … ┆ 1221.9122381.4954e7    ┆ 1     │
│ …            ┆ …         ┆ …      ┆ …      ┆ … ┆ …      ┆ …      ┆ …           ┆ …     │
│ 600513.XSHG  ┆ 2022/5/26 ┆ 1258.11270.0 ┆ … ┆ 1269.2127111.6133e7    ┆ 1     │
│ 600514.XSHG  ┆ 2022/5/27 ┆ 1258.21270.1 ┆ … ┆ 1269.3127121.6135e7    ┆ 1     │
│ 600515.XSHG  ┆ 2022/5/28 ┆ 1258.31270.2 ┆ … ┆ 1269.4127131.6138e7    ┆ 1     │
│ 600516.XSHG  ┆ 2022/5/29 ┆ 1258.41270.3 ┆ … ┆ 1269.5127141.6140423e7 ┆ 1     │
│ 600517.XSHG  ┆ 2022/5/30 ┆ 1258.51270.4 ┆ … ┆ 1269.6127151.6142964e7 ┆ 1     │
└──────────────┴───────────┴────────┴────────┴───┴────────┴────────┴─────────────┴───────┘
模拟接收并解析发送相应的数据: "600036.XSHG"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/699532.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙原生开发——轻内核A核源码分析系列三 物理内存(2)

3.1.2.3 函数OsVmPhysLargeAlloc 当执行到这个函数时&#xff0c;说明空闲链表上的单个内存页节点的大小已经不能满足要求&#xff0c;超过了第9个链表上的内存页节点的大小了。⑴处计算需要申请的内存大小。⑵从最大的链表上进行遍历每一个内存页节点。⑶根据每个内存页的开始…

02-DHCP原理与配置

1、DHCP的工作原理 当局域网中有大量的主机时&#xff0c;如果逐个为每一台主机手动设置IP地址、默认网关、DNS服务器地址等网络参数&#xff0c;这显然是一个费力也未必讨好的办法。 而DHCP服务器的应用&#xff0c;正好可以解决这一问题。 1.1 DHCP是什么 DHCP——动态主机…

[2024-06]-[大模型]-[Ollama] 0-相关命令

常用的ollama命令[持续更新中] ollama更新&#xff1a; curl https://ollama.ai/install.sh |sh带着flash attention启动&#xff1a; OLLAMA_FLASH_ATTENTION1 ollama serve停止ollama服务&#xff1a; sudo systemctl stop ollama note&#xff1a;目前遇到sudo systemctl …

驱动开发之 input 子系统

1.input 子系统介绍 input 就是输入的意思&#xff0c;input 子系统就是管理输入的子系统&#xff0c;和 pinctrl、gpio 子系统 一样&#xff0c;都是 Linux 内核针对某一类设备而创建的框架。比如按键输入、键盘、鼠标、触摸屏等 等这些都属于输入设备&#xff0c;不同的输入…

一文教你如何实现并发请求的失败自动重试及重试次数限制

需求 在并发接口请求的时候&#xff0c;能够自动对失败的请求进行重发尝试&#xff08;超过指定重试次数则不再重试&#xff09;,并将最终的结果返回&#xff08;包含每个请求是否成功、返回结果&#xff09; 核心思路 代码实现 使用案例 为了演示我们代码的最终实现效果&a…

使用 python 将 Markdown 文件转换为 ppt演示文稿

在这篇博客中&#xff0c;我们将展示如何使用 wxPython 创建一个简单的图形用户界面 (GUI)&#xff0c;以将 Markdown 文件转换为 PowerPoint 演示文稿。我们将利用 markdown2 模块将 Markdown 转换为 HTML&#xff0c;并使用 python-pptx 模块将 HTML 内容转换为 PowerPoint 幻…

HarmonyOS未来五年的市场展望

一、引言 随着科技的不断进步和消费者对于智能化设备需求的日益增长&#xff0c;操作系统作为连接硬件与软件的核心平台&#xff0c;其重要性愈发凸显。HarmonyOS&#xff08;鸿蒙系统&#xff09;&#xff0c;作为华为自主研发的分布式操作系统&#xff0c;自诞生以来便备受瞩…

6月11号作业

思维导图 #include <iostream> using namespace std; class Animal { private:string name; public:Animal(){}Animal(string name):name(name){//cout << "Animal&#xff1b;有参" << endl;}virtual void perform(){cout << "讲解员的…

UE4_后期_ben_模糊和锐化滤镜

学习笔记&#xff0c;不喜勿喷&#xff0c;侵权立删&#xff0c;祝愿生活越来越好&#xff01; 本篇教程主要介绍后期处理的简单模糊和锐化滤镜效果&#xff0c;学习之前首先要回顾下上节课介绍的屏幕扭曲效果&#xff1a; 这是全屏效果&#xff0c;然后又介绍了几种蒙版&#…

【PX4-AutoPilot教程-TIPS】PX4加速度计陀螺仪滤波器参数设置

PX4加速度计陀螺仪滤波器参数设置 前期准备滤波前FFT图滤波后FFT图 环境&#xff1a; 日志分析软件 : Flight Review PX4 &#xff1a;1.13.0 前期准备 进行滤波器参数设置的前提是飞机简单调试过PID已经可以稳定起飞&#xff0c;开源飞控的很多默认参数是可以让飞机平稳起…

springSecurity学习笔记(一)

简介 Spring Security是一个Java框架&#xff0c;用于保护应用程序的安全性。它提供了一套全面的安全解决方案&#xff0c;包括身份验证、授权、防止攻击等功能。Spring Security基于过滤器链的概念&#xff0c;可以轻松地集成到任何基于Spring的应用程序中。它支持多种身份验…

记一次华为2288H V5更换主板的辛酸

1、开机提示找不到设备&#xff0c;通过带外检查硬盘raid是否正常&#xff0c;如果正常就不是硬件问题&#xff0c;也不会是线没接好 2、网络不通&#xff0c;服务重启啥的都正常不会报错&#xff0c;就是ping不通网关&#xff0c;后来通过带外发现是网卡漂移了。 核对mac地址发…

Docker 国内镜像源更换

实现 替换docker 镜像源 前提要求 安装 docker docker-compose 参考创建一键更换docker国内镜像源 Docker 镜像代理DaoCloud 镜像站百度云 https://mirror.baidubce.com南京大学镜像站

若依RuoYi-Vue分离版—增加通知公告预览及缩放功能

若依RuoYi-Vue分离版—增加通知公告预览及缩放功能 前言开发通知公告 前言 若依分离版的通知公告没有预览功能&#xff0c;想开发通知公告功能 开发通知公告 效果如下 具体开发内容 修改若依notice代码如下。 <template><div class"app-container"&g…

16. 《C语言》——【牛客网BC124 —— BC130题目讲解】

亲爱的读者&#xff0c;大家好&#xff01;我是一名正在学习编程的高校生。在这个博客里&#xff0c;我将和大家一起探讨编程技巧、分享实用工具&#xff0c;并交流学习心得。希望通过我的博客&#xff0c;你能学到有用的知识&#xff0c;提高自己的技能&#xff0c;成为一名优…

orbslam2代码解读(4):loopclosing回环检测线程

书接上回&#xff0c;介绍完了局部建图线程&#xff0c;局部建图线程在进行局部BA之后&#xff0c;也会将新的关键帧mpLoopCloser放进回环线程的mlpLoopKeyFrameQueue容器中。所以这时候回环检测线程就根据这个新的关键帧来进行回环检测的操作。 回环检测的主要程序 // 线程主…

websocket php workerman 服务器nginx配置wss协议

首先 Nginx的版本要高&#xff0c;尽量用当前最新稳定版本。 其次 WSS协议&#xff0c;是在HTTPS协议的基础上&#xff0c;进行协议升级&#xff0c;进行通讯的&#xff0c;所以先要保证你有一个 HTTPS正常的WEB站点。 所以&#xff0c;通过Nginx -V 请保证 一定有 --with-ht…

Spring-Security(二)OAuth2认证详解(持续更新)

Spring Security & Oauth2系列&#xff1a; Spring Security&#xff08;一&#xff09; 源码分析及认证流程 Spring Security&#xff08;二&#xff09;OAuth2认证详解及自定义异常处理 文章目录 1、OAuth2.0 简介1.1 OAuth2.0 相关名词解释1.2 四种授权模式 1.3 、OAu…

2023 hnust 湖科大 嵌入式 实验报告+代码及复习资料等

2023 hnust 湖科大 嵌入式 实验报告代码及复习资料等 目录 流水灯 1 8位数码管动态扫描 3 按键输入 5 温度与关照 7 看门狗 9 内容 报告 代码 下载链接 https://pan.baidu.com/s/1LIN8rm42yrukXliI3XyZ1g?pwd1111

Java高阶数据结构-----并查集(详解)

目录 &#x1f9d0;一.并查集的基本概念&实例&#xff1a; &#x1f92a;二.并查集代码&#xff1a; &#x1f602;三&#xff1a;并查集的一些习题&#xff1a; A.省份数量 B.等式方程的可满足性 &#x1f9d0;一.并查集的基本概念&实例&#xff1a; 并查集概念&…