简单说说redis分布式锁

什么是分布式锁

分布式锁(多服务共享锁)在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问/操作。

为什么需要分布式锁

在单体应用服务里,不同的客户端操作同一个资源,我们可以通过操作系统提供的互斥(锁/信号量等等)来提供互斥的能力,保证操作资源的只有一个客户端。

在分布式的情况里,就需要第三方组件来保证对统一资源的操作的互斥。

(下单中,两个人下单,一个人下单请求走订单服务A机器,另一个人下单请求走订单服务B,这样用单体的思维处理就可能不是很合适,需要借用第三方组件配合来实现分布式锁)

分布式锁可以用redis, zookeeper , etcd等等来实现,下面我们简单说说....

redis分布式锁

简单例子

set key value ex/px nx 或setnx

以setnx 为例,可以使用 setnx key value 来进行 "加锁" ( setnx 主要 是nx加了语义:如果存在就不操作,不存在就添加),多个客户端确保只有一个加锁成功去操作统一资源

127.0.0.1:6379[1]> setnx lockObj 1   // 加锁
(integer) 1
127.0.0.1:6379[1]> setnx lockObj 1  // 存在了就不能再加锁
(integer) 0
127.0.0.1:6379[1]> get lockObj
"1"
127.0.0.1:6379[1]> del lockObj // 释放锁
(integer) 1
127.0.0.1:6379[1]> get lockObj
(nil)
127.0.0.1:6379[1]> setnx lockObj 1
(integer) 1

上面就是简单的加锁的例子,仔细思考下分布式锁使用的使用我们需要考虑哪些问题?

存在的问题

简单的总结了下,我们在使用redis分布式锁的时候需要考虑如下情况:

1- 死锁问题

2- 续锁生命周期

3- 操作原子性

4- 锁的归属权

5- redis 集群,锁的状态一致性

死锁问题

如果我们业务代码出现bug或服务器出现问题,没有及时释放锁,那么其他的客户端就永远获取不到这个加锁的资格。

这个时候我们就可以加上对应的处理逻辑:

golang 加上defer 加锁锁逻辑 , python try-Except-finally 用finally 释放锁。并且在加锁的时候加上过期时间(根据业务进行合适的加)

续锁生命周期

上面我们可以解决锁的释放问题,但是我们的业务处理时间不一定百分百能知道处理的时间,这个时候如果锁过期了但是资源操作没有做完,那么就会出现问题。

在java的 Redisson 有个watch 续命机制, golang 的话可以 借鉴 Rllock ,开启一个守护进程监听,定时续命(一定要提前续命,不要等到到时间再续)

操作原子性

在我们进行加锁 加过期时间的时候,这两个操作不能分两步操作。因为如果setnx加锁成功了,这时候失败了,那么这个锁就永远被占用了。

根据这个问题我们可以使用lua脚本或者使用第三方模块是可以同时进行这两个步骤的

`if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then return redis.call('PEXPIRE',KEYS[1],ARGV[2]) else return 0 end`

锁的归属权问题

现在有几个场景:

1- 客户端1 拿到锁处理业务, 没处理完已经过期了;这时候客户端2 拿到锁在处理,结果客户端1 处理完就释放锁了

2- 一个业务不同画像的人处理的业务不同,这时候我们就需要根据不同画像人进行分配 “锁”

我们在实际开发的时候有时候需要了解业务场景, 有时候需要给锁加一个所属权的令牌。可以在setnx key时,key设定特殊化的数值

redis 集群,锁的状态一致性

在redis采用集群(主从),如果master加锁失败了,这时候服务宕机了,slave还同步这个 key ,那么这个时候就会有客户端加锁成功

redis作者对于这个问题提出了解答:REDLOCK

zookeeper 实现分布式锁

简单例子

[zk: localhost:2181(CONNECTED) 62] create  /lock 
Created /lock
[zk: localhost:2181(CONNECTED) 63] create -s -e /lock/req  // 创建临时节点
Created /lock/req0000000000 
[zk: localhost:2181(CONNECTED) 64] create -s -e /lock/req
Created /lock/req0000000001
[zk: localhost:2181(CONNECTED) 65] create -s -e /lock/req
Created /lock/req0000000002
[zk: localhost:2181(CONNECTED) 67] ls /lock // 节点下的临时节点
[req0000000000, req0000000001, req0000000002] 
[zk: localhost:2181(CONNECTED) 68] delete /lock/req0000000000 // 释放第一锁

golang的例子

package main

import (
    "fmt"
    "github.com/samuel/go-zookeeper/zk"
    "sort"
    "time"
)

func sortChildren(children []string) {
    sort.Slice(children, func(i, j int) bool {
        return children[i] < children[j]
    })
}
func main() {
    go func() {
        conn, _, err := zk.Connect([]string{"xx.xx.xx.xx:2181"}, time.Second*5)
        if err != nil {
            fmt.Println("Connect:", err.Error())
            return
        }
        defer conn.Close()

        lockPath := "/locksObj"
        lockName := "lock"

        // 创建锁的根节点
        _, err = conn.Create(lockPath, []byte{}, int32(0), zk.WorldACL(zk.PermAll))
        if err != nil && err != zk.ErrNodeExists {
            fmt.Println("Create:", err.Error())
            return
        }

        // 获取锁
        lockNodePath, err := conn.CreateProtectedEphemeralSequential(lockPath+"/"+lockName+"-", []byte{}, zk.WorldACL(zk.PermAll))
        if err != nil {
            fmt.Println("CreateProtectedEphemeralSequential:", err.Error())
            return
        }
        // dowork

        for {
            children, _, err := conn.Children(lockPath)
            if err != nil {
                fmt.Println("Children:", err.Error())
                return
            }

            // 对子节点按照序列号进行排序
            sortChildren(children)

            // 检查自己创建的节点是否是第一个节点
            if lockNodePath == lockPath+"/"+children[0] {
                // 获取到了锁
                fmt.Println("Acquired lock")
                break
            }

            // 监听前一个节点的删除事件
            exists, _, watch, err := conn.ExistsW(lockPath + "/" + children[0])
            if err != nil {
                fmt.Println("ExistsW: ", err.Error())
                break
            }

            if !exists {
                // 前一个节点已删除,再次检查自己创建的节点是否是第一个节点
                children, _, err = conn.Children(lockPath)
                if err != nil {
                    fmt.Println("Children: ", err.Error())
                    break
                }

                sortChildren(children)

                if lockNodePath == lockPath+"/"+children[0] {
                    // 获取到了锁
                    fmt.Println("Acquired lock")
                    break
                }
            }

            // 等待前一个节点的删除事件
            <-watch
        }

        // 执行需要保护的代码
        fmt.Println("====start-1====")

        time.Sleep(3 * time.Second)
        fmt.Println(11111111)

        // 释放锁,删除自己创建的节点
        err = conn.Delete(lockNodePath, -1)
        if err != nil {
            fmt.Println("Delete: ", err.Error())
            return
        }

        fmt.Println("Released lock")
    }()
    go func() {
        conn, _, err := zk.Connect([]string{"xx.xx.xx.xx:2181"}, time.Second*5)
        if err != nil {
            fmt.Println("Connect:", err.Error())
            return
        }
        defer conn.Close()

        lockPath := "/locksObj"
        lockName := "lock"

        // 创建锁的根节点
        _, err = conn.Create(lockPath, []byte{}, int32(0), zk.WorldACL(zk.PermAll))
        if err != nil && err != zk.ErrNodeExists {
            fmt.Println("Create:", err.Error())
            return
        }

        // 获取锁
        lockNodePath, err := conn.CreateProtectedEphemeralSequential(lockPath+"/"+lockName+"-", []byte{}, zk.WorldACL(zk.PermAll))
        if err != nil {
            fmt.Println("CreateProtectedEphemeralSequential:", err.Error())
            return
        }

        for {
            children, _, err := conn.Children(lockPath)
            if err != nil {
                fmt.Println("Children:", err.Error())
                return
            }

            // 对子节点按照序列号进行排序
            sortChildren(children)

            // 检查自己创建的节点是否是第一个节点
            if lockNodePath == lockPath+"/"+children[0] {
                // 获取到了锁
                fmt.Println("Acquired lock")
                break
            }

            // 监听前一个节点的删除事件
            exists, _, watch, err := conn.ExistsW(lockPath + "/" + children[0])
            if err != nil {
                fmt.Println("ExistsW: ", err.Error())
                break
            }

            if !exists {
                // 前一个节点已删除,再次检查自己创建的节点是否是第一个节点
                children, _, err = conn.Children(lockPath)
                if err != nil {
                    fmt.Println("Children: ", err.Error())
                    break
                }

                sortChildren(children)

                if lockNodePath == lockPath+"/"+children[0] {
                    // 获取到了锁
                    fmt.Println("Acquired lock")
                    break
                }
            }

            // 等待前一个节点的删除事件
            <-watch
        }

        // 执行需要保护的代码
        fmt.Println("====start-2====")

        time.Sleep(5 * time.Second)
        fmt.Println(22222222)
        // 释放锁,删除自己创建的节点
        err = conn.Delete(lockNodePath, -1)
        if err != nil {
            fmt.Println("Delete: ", err.Error())
            return
        }

        fmt.Println("Released lock")
    }()
    time.Sleep(10 * time.Second)
}

zookeeper 怎么实现 分布式锁的

zookeeper 会建立一个长链接,监听锁对象节点的状态和事件

ETCD实现 分布式锁

简单实现

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"time"
)


func main() {
	go func() {
		config := clientv3.Config{
			Endpoints:   []string{"xx.xx.xx.xx:2379"},
			DialTimeout: 5 * time.Second,
		}

		// 获取客户端连接
		client, err := clientv3.New(config)
		if err != nil {
			fmt.Println(err)
			return
		}

		//  上锁
		// 用于申请租约
		lease := clientv3.NewLease(client)

		// 申请一个10s的租约
		leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
		if err != nil {
			fmt.Println(err)
			return
		}

		// 拿到租约的id
		leaseID := leaseGrantResp.ID

		ctx, cancelFunc := context.WithCancel(context.TODO())

		// 停止
		defer cancelFunc()
		// 确保函数退出后,租约会失效
		defer lease.Revoke(context.TODO(), leaseID)

		// 自动续租
		keepRespChan, err := lease.KeepAlive(ctx, leaseID)
		if err != nil {
			fmt.Println(err)
			return
		}

		// 处理续租应答的协程
		go func() {
			select {
			case keepResp := <-keepRespChan:
				if keepRespChan == nil {
					fmt.Println("lease has expired")
					break
				} else {
					// 每秒会续租一次
					fmt.Println("收到自动续租应答", keepResp.ID)
				}
			}
		}()

		// if key 不存在,then设置它,else抢锁失败
		kv := clientv3.NewKV(client)
		// 创建事务
		txn := kv.Txn(context.TODO())
		// 如果key不存在
		txn.If(clientv3.Compare(clientv3.CreateRevision("/lockObj/lock/job"), "=", 0)).
			Then(clientv3.OpPut("/lockObj/lock/job", "", clientv3.WithLease(leaseID))).
			Else(clientv3.OpGet("/lockObj/lock/job")) //如果key存在

		// 提交事务
		txnResp, err := txn.Commit()
		if err != nil {
			fmt.Println(err)
			return
		}

		// 判断是否抢到了锁
		if !txnResp.Succeeded {
			fmt.Println("锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
			return
		}

		// 处理业务

		fmt.Println("======work======")
		time.Sleep(5 * time.Second)
		fmt.Println("======END======")


	}()
	time.Sleep(20 * time.Second)
}

实现原理

etcd 支持以下功能,正是依赖这些功能来实现分布式锁的:

  • Lease机制:即租约机制(TTL,Time To Live),etcd可以为存储的kv对设置租约,当租约到期,kv将失效删除;同时也支持续约,keepalive
  • Revision机制:每个key带有一个Revision属性值,etcd每进行一次事务对应的全局Revision值都会+1,因此每个key对应的Revision属性值都是全局唯一的。通过比较Revision的大小就可以知道进行写操作的顺序
  • 在实现分布式锁时,多个程序同时抢锁,根据Revision值大小依次获得锁,避免“惊群效应”,实现公平锁
  • Prefix机制:也称为目录机制,可以根据前缀获得该目录下所有的key及其对应的属性值
  • watch机制:watch支持watch某个固定的key或者一个前缀目录,当watch的key发生变化,客户端将收到通知

执行流程

  • 步骤 1: 准备

客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;

  • 步骤 2: 创建定时任务作为租约的“心跳”

当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。

  • 步骤 3: 客户端将自己全局唯一的 key 写入 Etcd

进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。

  • 步骤 4: 客户端判断是否获得锁

客户端以前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

  • 步骤 5: 执行业务

获得锁后,操作共享资源,执行业务代码。

  • 步骤 6: 释放锁

完成业务流程后,删除对应的key释放锁。

扩展

马丁·克莱普曼 对 分布式锁以及对redlock的看法

分布式锁的目的是确保在可能尝试执行同一工作的多个节点中,只有一个节点实际执行该操作(至少一次只有一个)。

主要有两个功能:

1- 效率:使用锁可以避免不必要地重复相同的工作,多执行一次也无妨,只要最终正确就行

2- 正确性:锁定可以防止并发进程互相干扰并扰乱系统状态。如果锁定失败并且两个节点同时处理同一数据,则会导致文件损坏、数据丢失、永久不一致。

马丁认为锁在分布式系统使用会碰到以下三类问题:

1- 网络延迟:您可以保证数据包始终在某个保证的最大延迟内到达

2- GC问题: 导致锁无法续期等等问题

3- 时钟飘移:依赖于时钟的就容易出现问题

马丁认为redlock 强依赖于时钟,节点之间时钟不对,会使锁不可靠:

假设系统有五个 Redis 节点(A、B、C、D 和 E)和两个客户端(1 和 2)。如果其中一个 Redis 节点上的时钟向前跳动,会发生什么情况?

  1. 客户端 1 获取节点 A、B、C 上的锁。由于网络问题,无法访问 D 和 E。
  2. 节点C上的时钟向前跳跃,导致锁过期。
  3. 客户端2获取节点C、D、E上的锁。由于网络问题,无法访问A和B。
  4. 客户 1 和 2 现在都相信他们持有锁。

如果 C 在将锁持久保存到磁盘之前崩溃并立即重新启动,则可能会发生类似的问题。因此,Redlock 文档建议延迟重新启动崩溃的节点,至少要延迟最长寿命锁的生存时间。但这种重新启动延迟再次依赖于对时间的相当准确的测量,并且如果时钟跳跃就会失败。

马丁提出了 fencing token 方案

客户端 1 获取租约并获得令牌 33,但随后它进入长时间暂停状态并且租约到期。客户端 2 获取租约,获取令牌 34(数字始终增加),然后将其写入发送到存储服务,包括 34 的令牌。稍后,客户端 1 恢复正常并将其写入发送到存储服务,包括其令牌值 33。但是,存储服务器记得它已经处理了具有更高令牌编号 (34) 的写入,因此它拒绝具有令牌 33 的请求。

总结

分布式锁不是百分百安全,我们要根据实际使用情况来考虑锁的使用(解决效率问题还是正确行问题),在使用分布式锁的时候我们需要考虑锁的续期,锁归属,集群数据一致性,操作原子性,GC,时钟飘逸,网络延迟等等的问题。在cap 理论里, redis保证了ap, zk和etcd保证cp ,所以实际使用中根据业务的情况,选择redis/zk/etcd之一来实现分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/367855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jupyter Notebook中的%matplotlib inline详解

Jupyter Notebook中的%matplotlib inline详解 &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;什么是魔术命令&#x1f333;&#x1f333;%matplotlib inline详解&#x1f333;(&#x1f448;直入主题请点击)&#x1f333;小结&#x1f333;&…

【乳腺肿瘤诊断分类及预测】基于Elman神经网络

课题名称&#xff1a;基于Elman神经网络的乳腺肿瘤诊断分类及预测 版本日期&#xff1a;2023-05-15 运行方式: 直接运行Elman0501.m 文件即可 代码获取方式&#xff1a;私信博主或QQ&#xff1a;491052175 模型描述&#xff1a; 威斯康辛大学医学院经过多年的收集和整理&a…

前端 reduce()用法总结

定义 reduce()方法对数组中的每个元素执行一个由您提供的reducer函数(升序执行)&#xff0c;将其结果汇总为单个返回值。语法为&#xff1a; array.reduce(function(accumulator, currentValue, currentIndex, arr), initialValue); /*accumulator: 必需。累计器currentValu…

Android13源码下载及全编译流程

目录 一、源码下载 1.1、配置要求 1.1.1、硬件配置要求 1.1.2、软件要求 1.2、下载环境搭建 1.2.1、依赖安装 1.2.2、工具安装 1.2.3、git配置 1.2.4、repo配置 1.3、源码下载 1.3.1、明确下载版本 1.3.2、替换为清华源 1.3.3、初始化仓库并指定分支 1.3.4、同步全部源码 二、…

运用 StringJoiner 高效的拼接字符串

运用 StringJoiner 高效的拼接字符串 package com.zhong.stringdemo;import java.util.ArrayList; import java.util.StringJoiner;public class Test {public static void main(String[] args) {ArrayList<String> s new ArrayList<>();s.add("11");s.…

二进制_八进制_十六进制和十进制之间互转(简单明了)

文章目录 二进制_八进制_十六进制和十进制之间互转&#xff08;简单明了&#xff09;二进制八进制十六进制将二进制、八进制、十六进制转换为十进制1) 整数部分2) 小数部分 将十进制转换为二进制、八进制、十六进制1) 整数部分2) 小数部分 二进制和八进制、十六进制的转换1) 二…

css新手教程

css新手教程 课程&#xff1a;14、盒子模型及边框使用_哔哩哔哩_bilibili 一.什么是CSS 1.什么是CSS Cascading Style Sheet 层叠样式表。 CSS&#xff1a;表现&#xff08;美化网页&#xff09; 字体&#xff0c;颜色&#xff0c;边距&#xff0c;高度&#xff0c;宽度&am…

git 如何修改仓库地址

问题背景&#xff1a;组内更换大部门之后&#xff0c;代码仓的地址也迁移了&#xff0c;所以原来的git仓库地址失效了。 虽然重新建一个新的文件夹&#xff0c;再把每个项目都git clone一遍也可以。但是有点繁琐&#xff0c;而且有的项目本地还有已经开发一半的代码&#xff0c…

Pandas.Series.clip() 修剪数值范围 详解 含代码 含测试数据集 随Pandas版本持续更新

关于Pandas版本&#xff1a; 本文基于 pandas2.2.0 编写。 关于本文内容更新&#xff1a; 随着pandas的stable版本更迭&#xff0c;本文持续更新&#xff0c;不断完善补充。 传送门&#xff1a; Pandas API参考目录 传送门&#xff1a; Pandas 版本更新及新特性 传送门&…

虚幻UE5Matehuman定制自己的虚拟人,从相机拍照到UE5制作全流程

开启自己的元宇宙,照片扫描真实的人类,生成虚拟形象,保姆级教程,欢迎大家指正。 需要的软件: 制作流程: 一.拍照。 围绕自己拍照,大概20多张图就差不多了,把脑门漏出来,无需拍后脑勺。 拍照方式 例如,拍照时尽量不要在脸上体现出明显的光源方向。

低版本MATLAB打开高版本Simulink文件的方法

打开simulink&#xff0c;依次点击“建模”、“环境”、“simulink预设项”&#xff0c;如图所示&#xff1a; 然后在弹出的窗口中&#xff0c;点击“模型文件”&#xff0c;并取消勾选“不要加载用更新版本的simulink创建的模型”&#xff0c;接着点击“应用”即可。如图所示&…

使用unicorn模拟执行去除混淆

0. 前言 在分析某app的so时遇到了间接跳转类型的混淆&#xff0c;不去掉的话无法使用ida f5来静态分析&#xff0c;f5之后就长下面这样&#xff1a; 本文记录一下使用pythonunicorn模拟执行来去掉混淆的过程。 1. 分析混淆的模式 混淆的汇编代码如下&#xff1a; 可以看到…

《计算机网络简易速速上手小册》第7章:云计算与网络服务(2024 最新版)

文章目录 7.1 云服务模型&#xff08;IaaS, PaaS, SaaS&#xff09;- 你的技术魔法盒7.1.1 基础知识7.1.2 重点案例&#xff1a;构建和部署 Python Web 应用实现步骤具体操作步骤1&#xff1a;创建 Flask Web 应用步骤2&#xff1a;准备应用部署 7.1.3 拓展案例1&#xff1a;使…

机器学习5-线性回归之损失函数

在线性回归中&#xff0c;我们通常使用最小二乘法&#xff08;Ordinary Least Squares, OLS&#xff09;来求解损失函数。线性回归的目标是找到一条直线&#xff0c;使得预测值与实际值的平方差最小化。 假设有数据集 其中 是输入特征&#xff0c; 是对应的输出。 线性回归的…

查看docker服务的IP地址

要查看Docker容器服务的IP地址&#xff0c;可以使用以下命令&#xff1a; 如果你知道容器名称或容器ID&#xff0c;直接通过容器ID或容器名称来获取IP地址&#xff1a; # 使用容器ID获取IP地址 docker inspect -f {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}} …

cesium-加载谷歌影像

cesium在开发的时候有可能会加载不同的影像&#xff0c;今天就先看一下加载谷歌的吧。 使用谷歌有个好处就是基本不会出现此区域无卫星图的情况 闲言话语不多说&#xff0c;看代码 <template><div id"cesiumContainer" style"height: 100vh;"&g…

【SpringBoot】application配置文件(4)

freemarker:cache: false 这是关于 freemarker 模板引擎的一个配置&#xff0c;用于控制模板的缓存行为 当cache 设置为 false 时&#xff0c;意味着每次请求时都会重新加载和编译模板&#xff0c;而不是从缓存中获取 编译模板。 将 cache 设置为 false 是为了在开发过程中获…

python求解中位数

首先将数组nums进行排序&#xff0c;然后找到中间位置的数值 如果数组长度n为奇数&#xff0c;则(n1)/2处对应值为中位数&#xff0c;如果数组下标从0开始&#xff0c;还需要减去1 如果数组长度n为偶数&#xff0c;则n/2,n/21两个位置数的平均值为中位数 假设中位数为x&#x…

机器学习复习(2)——线性回归SGD优化算法

目录 线性回归代码 线性回归理论 SGD算法 手撕线性回归算法 模型初始化 定义模型主体部分 定义线性回归模型训练过程 数据demo准备 模型训练与权重参数 定义线性回归预测函数 定义R2系数计算 可视化展示 预测结果 训练过程 sklearn进行机器学习 线性回归代码…

CSC联合培养博士申请亲历|联系外导的详细过程

在CSC申报的各环节中&#xff0c;联系外导获得邀请函是关键步骤。这位联培博士同学的这篇文章&#xff0c;非常详细且真实地记录了申请过程、心理感受&#xff0c;并提出有益的建议&#xff0c;小编特推荐给大家参考。 2024年国家留学基金委公派留学项目即将开始&#xff0c;其…