基本了解
客户端、长连接与租约的关系
客户端对象
etcd的客户端对象是用户与etcd服务进行交互的主要接口,主要功能就是存储、通知和事务等功能访问
- 键值存储:客户端通过put 和 get操作存储数据;数据存储在etcd的层级化键值数据库中
- 监听器(Watcher):客户端可以监听指定键的变化事件,如果键值发生改变,etcd会通过回调通知客户端;借助监听器可以实现动态管理配置和服务发现
- 事务:提供txn接口,允许客户端进行原子性操作
工作原理
- 客户端通过gRpc与etcd集群进行交互
- 连接时,客户端需要提供集群的地址和认证信息
- 连接建立后,客户端可以调用功能不同API完成具体功能
长连接
长连接是指客户端与etcd服务之间的持续连接,主要用于保持会话状态以及实现实时性功能
- 保持租约长期有效:租约需要客户端通过长连接向etcd定期发送心跳包,否则租约会到期
- 监听:监听功能依赖于长连接,当监听的键发生变化的时候,etcd通过长连接向客户端推送事件
- 减少开销:长连接避免了频繁的连接断开,节省了建立连接的时间和资源
工作原理
- 客户端与
etcd
集群节点通过 gRPC 建立连接 - 建立连接后,客户端发送
KeepAlive
请求以保持连接 - 如果
etcd
未在一定时间内接收到心跳,会认为连接断开
租约
etcd提供的一种机制,主要用于一段时间内绑定键值数据,并确保键值在租约有效期内有效
- 动态生存时间:键值可以绑定到租约,租约到期后,绑定的键值会自动删除
- 分布式锁:租约结合键值和keeplive实现分布式锁,确保资源的唯一占用
- Session管理:通过租约,可以熟实现分布式会话管理
工作原理
- 客户端调用
LeaseGrant
接口创建租约,并设置租期 - 将键值与租约绑定。例如,使用
Put
命令时指定租约ID - 客户端通过长连接定期向
etcd
发送心跳,续租以保持租约有效 - 如果未续约,租约到期后,绑定的键值会自动被删除
应用:服务实例向 etcd
注册自己(通过租约绑定信息),当实例下线或不可用时,租约自动到期并移除信息
基本使用
安装
框架安装
GitHub - etcd-cpp-apiv3/etcd-cpp-apiv3: The etcd-cpp-apiv3 is a C++ library for etcd's v3 client APIs, i.e., ETCDCTL_API=3.
验证动态库和相应的文件是否存在
使用
上传和获取逻辑
// get.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <thread>
// 回调函数:用于处理监控到的键值变化事件
void callback(const etcd::Response &resp) {
// 如果收到的事件通知无效,打印错误信息并返回
if (!resp.is_ok()) {
std::cout << "收到一个错误的事件通知: " << resp.error_message() << std::endl;
return;
}
// 遍历事件列表,逐个处理每个事件
for (auto const& ev : resp.events()) {
// 如果事件类型是 PUT(键值被创建或修改)
if (ev.event_type() == etcd::Event::EventType::PUT) {
std::cout << "服务信息发生了改变:\n";
std::cout << "当前的值:" << ev.kv().key() << " - " << ev.kv().as_string() << std::endl;
std::cout << "原来的值:" << ev.prev_kv().key() << " - " << ev.prev_kv().as_string() << std::endl;
}
// 如果事件类型是 DELETE(键值被删除)
else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
std::cout << "服务信息下线被删除:\n";
std::cout << "当前的值:" << ev.kv().key() << " - " << ev.kv().as_string() << std::endl;
std::cout << "原来的值:" << ev.prev_kv().key() << " - " << ev.prev_kv().as_string() << std::endl;
}
}
}
int main(int argc, char *argv[])
{
// 定义etcd服务的地址,默认使用本地运行的etcd服务
std::string etcd_host = "http://127.0.0.1:2379";
// 1. 实例化etcd客户端对象,用于和etcd服务交互
etcd::Client client(etcd_host);
// 2. 获取 "/service" 路径下的所有键值对
auto resp = client.ls("/service").get(); // 同步调用获取响应
if (!resp.is_ok()) {
// 如果获取失败,打印错误信息并退出程序
std::cout << "获取键值对数据失败: " << resp.error_message() << std::endl;
return -1;
}
// 3. 遍历返回的键值对,打印每个键值对的信息
int sz = resp.keys().size(); // 获取键值对的数量
for (int i = 0; i < sz; ++i) {
std::cout << resp.value(i).as_string() << " 可以提供 " << resp.key(i) << " 服务\n";
}
// 4. 实例化一个Watcher对象,用于监听 "/service" 路径下的键值变化
// 参数说明:
// - `client`: 使用前面实例化的客户端对象
// - "/service": 要监听的路径
// - `callback`: 监听到键值变化后调用的回调函数
// - `true`: 是否递归监听路径下的子路径(true 表示递归)
auto watcher = etcd::Watcher(client, "/service", callback, true);
// 5. 启动Watcher,阻塞当前线程,持续监听事件,直到程序被中断
watcher.Wait();
// 6. 结束程序
return 0;
}
//put.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <thread>
int main(int argc, char *argv[])
{
std::string etcd_host = "http://127.0.0.1:2379";
//实例化客户端对象
etcd::Client client(etcd_host);
//获取租约保活对象--伴随着创建一个指定有效时长的租约
auto keep_alive = client.leasekeepalive(3).get();
//获取租约ID
auto lease_id = keep_alive->Lease();
//向etcd新增数据
auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
if (resp1.is_ok() == false) {
std::cout << "新增数据失败:" << resp1.error_message() << std::endl;
return -1;
}
auto resp2 = client.put("/service/friend", "127.0.0.1:9090").get();
if (resp2.is_ok() == false) {
std::cout << "新增数据失败:" << resp2.error_message() << std::endl;
return -1;
}
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
接口学习
基于上传和获取学习接口
etcd::Client
类
etcd::Client
是核心客户端类,用于与 etcd 服务进行交互,提供基本的键值存取和操作功能
构造函数
etcd_url
:etcd 服务地址(例如http://127.0.0.1:2379
)- 创建一个与 etcd 服务通信的客户端对象
etcd::Client client(const std::string &etcd_url);
ls()
方法
- 列出路径下所有的键值
key
:路径键,表示需要获取其子键的根路径(例如/service
)- 返回一个
std::future<etcd::Response>
,可以通过.get()
同步获取结果
std::future<etcd::Response> ls(const std::string &key);
auto resp = client.ls("/service").get();
if (resp.is_ok()) {
// 获取成功,遍历键值
for (const auto &key : resp.keys()) {
std::cout << key << ": " << resp.value(key).as_string() << std::endl;
}
} else {
std::cerr << "获取失败: " << resp.error_message() << std::endl;
}
etcd::Watcher
类
用于监听指定路径下的键值变化事件,并触发回调函数处理这些事件
构造函数
client
:etcd::Client
对象,用于连接 etcd 服务。key
:要监听的路径键(例如/service
)。callback
:回调函数,当监听到键值变化时触发,参数是etcd::Response
对象。recursive
:是否递归监听子路径下的键值变化,默认为false
。- 功能:创建一个监听器对象,监听制定路径下的键值变化
etcd::Watcher watcher(etcd::Client &client, const std::string &key,
std::function<void(const etcd::Response &)> callback,
bool recursive = false);
Wait()
方法
- 阻塞当前线程,持续监听键值变化事件,直到被中断
auto watcher = etcd::Watcher(client, "/service", callback, true);
watcher.Wait(); // 阻塞线程等待事件
etcd::Response
类
etcd 请求或事件的响应对象,包含操作结果及相关数据
is_ok()
方法
- 判断请求是否成功:成功则返回true,失败则返回false
bool is_ok() const;
error_message()
方法
- 获取请求或者事件失败的错误信息
std::string error_message() const;
keys()
方法
- 获取路径下所有键的列表
std::vector<std::string> keys() const;
value(key)
方法
- 返回指向键的值对象
etcd::Value value(const std::string &key) const;
events()
方法
- 获取监听器捕获到的事件列表
std::vector<etcd::Event> events() const;
etcd::Value
类
etcd::Value
表示 etcd 中的键值对,包含键、值及其元数据
key()
方法
- 获取键的名称
std::string key() const;
as_string()
方法
- 获取键对应的值(字符串形式)
std::string as_string() const;
etcd::Value val = resp.value(key);
std::cout << "Key: " << val.key() << ", Value: " << val.as_string() << std::endl;
二次封装
基本思想
服务注册客户端类(Registry)
主要目标
- 向etcd注册服务信息(通过键值对方式)并确保其活跃状态
- 借用租约机制保证注册信息的有效性,如果租约过期,服务注册信息回自动清除
实现逻辑
- 创建一个etcd客户端,用于与etcd进行通信
- 创建租约(Lease),为服务注册信息绑定一个动态生存空间
- 定期发送心跳保活(KeepLive),续约租约,确保服务信息不会被自动删除
- 提供一个registry方法,用于注册服务信息(键值对)
服务发现客户端类
主要目标
- 从etcd中获取指定路径下的当前服务信息
- 通过监听机制实时监控服务信息的变化(例如服务新增、服务下线)
- 使用回调函数处理服务变更事件
实现逻辑
- 创建etcd客户端用于与etcd通信
- 使用ls方法获取当前路径下所有的键值对,处理当前已有数据
- 创建Watcher对象,监听指定路径下的数据变更事件
- 在事件回调中,根据事件的类型调用对应的回调函数
具体实现
细节补充:Watcher中使用Bind进行回调的逻辑总结
- Watcher 检测到事件:
etcd::Watcher
检测到指定路径的键值发生了变化(例如新增或删除) - Watcher 调用回调函数:
Watcher
将变化事件封装为etcd::Response
对象,调用绑定后的函数对象,并将etcd::Response
作为参数传递 - 绑定的函数对象调用
callback
:其中,this
是Discovery
类的实例,resp
是etcd::Watcher
提供的事件响应 - callback 函数执行:
callback
使用resp
提供的事件信息,执行具体的业务逻辑(例如调用_put_cb
或_del_cb
)
this->callback(resp);
etcd二次封装实现
#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <functional>
#include "logger.hpp"
namespace mag {
// 服务注册客户端类
class Registry
{
public:
using ptr = std::shared_ptr<Registry>;
// 构造函数:初始化etcd客户端和租约(Lease)机制
Registry(const std::string &host):
_client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
_keep_alive(_client->leasekeepalive(3).get()), // 创建一个3秒TTL的租约并保持续约
_lease_id(_keep_alive->Lease()){} // 获取租约ID
// 析构函数:取消租约续约,释放资源
~Registry() { _keep_alive->Cancel(); }
// 注册服务信息:将键值对存入etcd并绑定租约
bool registry(const std::string &key, const std::string &val)
{
auto resp = _client->put(key, val, _lease_id).get(); // 绑定租约,写入键值对
if (resp.is_ok() == false)
{
LOG_ERROR("注册数据失败:{}", resp.error_message()); // 打印错误日志
return false;
}
return true; // 返回注册结果
}
private:
std::shared_ptr<etcd::Client> _client; // etcd客户端对象
std::shared_ptr<etcd::KeepAlive> _keep_alive; // 租约续约对象
uint64_t _lease_id; // 租约ID
};
// 服务发现客户端类
class Discovery
{
public:
using ptr = std::shared_ptr<Discovery>;
using NotifyCallback = std::function<void(std::string, std::string)>;
// 构造函数:初始化etcd客户端,执行服务发现和事件监听
Discovery(const std::string &host,
const std::string &basedir,
const NotifyCallback &put_cb,
const NotifyCallback &del_cb)
:_client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
_put_cb(put_cb), _del_cb(del_cb) // 设置回调函数
{
// 服务发现:获取当前路径下的已有服务数据
auto resp = _client->ls(basedir).get(); // 列出路径下的所有键值
if (resp.is_ok() == false)
{
LOG_ERROR("获取服务信息数据失败:{}", resp.error_message()); // 打印错误日志
}
int sz = resp.keys().size();
for (int i = 0; i < sz; ++i)
{
if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string()); // 调用服务新增回调
}
// 事件监控:监听路径数据变更并调用回调处理
_watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
std::bind(&Discovery::callback, this, std::placeholders::_1), true);
}
// 析构函数:取消事件监听,释放资源
~Discovery() {
_watcher->Cancel();
}
private:
// 事件回调函数:处理服务新增和删除事件
void callback(const etcd::Response &resp)
{
if (resp.is_ok() == false) {
LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message()); // 打印错误日志
return;
}
for (auto const& ev : resp.events())
{
// 新增服务事件
if (ev.event_type() == etcd::Event::EventType::PUT) {
if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string()); // 调用服务新增回调
LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string()); // 打印调试信息
}
// 服务下线事件
else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string()); // 调用服务删除回调
LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string()); // 打印调试信息
}
}
}
private:
NotifyCallback _put_cb; // 服务新增事件回调
NotifyCallback _del_cb; // 服务删除事件回调
std::shared_ptr<etcd::Client> _client; // etcd客户端对象
std::shared_ptr<etcd::Watcher> _watcher; // 事件监听器
};
}
测试注册和发现逻辑
#include "etcd.hpp"
#include <iostream>
#include <chrono>
#include <thread>
// 服务新增回调
void onServiceAdded(const std::string &key, const std::string &value) {
std::cout << "发现新增服务:" << key << " -> " << value << std::endl;
}
// 服务下线回调
void onServiceRemoved(const std::string &key, const std::string &value) {
std::cout << "服务下线:" << key << " -> " << value << std::endl;
}
int main() {
std::string etcd_host = "http://127.0.0.1:2379";
std::string watch_dir = "/service";
// 创建Discovery对象
mag::Discovery::ptr discovery = std::make_shared<mag::Discovery>(
etcd_host, watch_dir, onServiceAdded, onServiceRemoved
);
// 保持监听状态
std::cout << "开始监听服务变更,按 Ctrl+C 退出..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
#include "etcd.hpp"
#include <iostream>
#include <chrono>
#include <thread>
int main() {
std::string etcd_host = "http://127.0.0.1:2379";
std::string key = "/service/example";
std::string value = "127.0.0.1:8080";
// 创建Registry对象
mag::Registry::ptr registry = std::make_shared<mag::Registry>(etcd_host);
// 注册服务
if (registry->registry(key, value)) {
std::cout << "服务注册成功:" << key << " -> " << value << std::endl;
} else {
std::cerr << "服务注册失败!" << std::endl;
return -1;
}
// 保持注册信息的活跃状态
std::cout << "服务正在运行,按 Ctrl+C 退出..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
问题
编译问题慢
问题描述
多次对该文件进行编译,编译总是卡在80%左右停止,服务器环境是2核2G的云服务器
分析
经htop进行排查,发现部分服务占用了大量内存,从而导致编译写入和读取文件速度变慢
解决思路
停止占用内存较高的MySQL服务
sudo systemctl stop mysql
清理缓存和无用数据
sync; echo 3 | sudo tee /proc/sys/vm/drop_caches
删除临时文件进一步释放空间
sudo apt autoremove -y
sudo apt clean
使用两个线程对程序进行编译
make -j2