池化技术
- 池化技术能够减少资源对象的创建次数,提高程序的响应性能,特别是在高并发下这种提高更明显。
- 共同特征
- 对象创建时间长。
- 对象创建需要大量资源。
- 对象创建后可被重复使用。
数据库连接池
- 数据库连接池(Connection pooling)是程序启动时建立足够的数据库连接,并将这些连接组成一个连接池,由程序动态地对池中的连接进行申请,使用,释放。
- 数据库连接:服务器需要跟数据库建立 TCP 连接。
- TCP 连接需要三次握手建立连接,然后服务器就可以通过这条连接进行收发数据了。当这条连接使用完毕后,需要通过四次挥手断开连接。
- 收发数据的规律:请求回应模式。
- 收发的数据要符合 MySQL 协议,发数据的时候要通过 MySQL 协议进行编码,编码为二进制发送给对端,收数据的时候要通过 MySQL 协议进行解码。
- 所有跟 MySQL 进行交互的服务器都需要安装 MySQL 驱动 → 具备编码和解码的能力。
libmysqlclient.lib/.dll
(Windows)、libmysqlclient.a/.so
(Linux)、mysql.h
。官方实现的驱动采用的是阻塞 IO:当 IO 未就绪的时候会阻塞线程,比如当协议栈中的接收缓冲区没有数据时,而 read 是把内核态的接收缓冲区中的数据拷贝到用户态来,那么执行 read 的线程就会发生阻塞。- 自己实现 MySQL 协议的解码和编码。workflow、openresty 实现的是非阻塞 IO。
- MySQL 的网络模型
- 有一个主线程,它会调用
select
监听listenfd
。(MySQL 会被绑定在 3306 端口上)- 使用 select 的原因:select 可以跨平台。
- 只要有服务器与 MySQL 建立连接,select 就会响应一个读事件,MySQL 会为每一条连接分配一个线程;MySQL 最大连接数通常为 151。(my.cnf)
- 在这个线程中有一个
while
循环,不断地去read
数据,当界定出一个完整的数据包后,就会执行 SQL 的逻辑,最后把执行 SQL 的结果write
给服务器。while (true) { read 界定数据包 执行 SQL 的逻辑 write }
- 当建立多条连接时,MySQL 内部会有多个线程并发执行 SQL 语句。
- 有一个主线程,它会调用
- 短连接:不复用连接,每次执行 SQL 语句都会建立连接、断开连接。
- 优点:当数据库访问频次不高的时候,节省资源,每次用完就会销毁。
- 缺点:当数据库访问频次高的时候,会不断地创建资源、销毁资源。
- 长连接:会维持 TCP 连接,不让它断开,会复用这条连接去执行 SQL 语句。
- 数据库连接既是 TCP 连接,也是长连接。
- 没有 SQL 语句执行,如何维持 TCP 连接 ?
- TCP 中的 keepalive 选项只是保持服务器和 MySQL 的内核态网络协议栈是网络畅通的。
- 通过定时发送心跳的方式维持 TCP 连接:服务器发送一个数据包到达 MySQL,MySQL 通过系统调用取出数据包,然后再回一个数据包到达服务器,服务器通过系统调用取出数据包。调用
mysql_ping
和mysql_pong
接口。 - 定时发送心跳与 keepalive 区别:定时发送心跳除了可以探知连接是否可用,还可以确认处理 SQL 语句的线程是否活跃、是否发生阻塞。
同步连接和异步连接
- 同步连接:同步等待连接的返回,会阻塞当前线程。
- 异步连接
- 通过回调函数去处理返回结果,异步获取连接的返回。
void callback(SQLResult &res) { } DBImpl->AsyncQuery(sqlstr, callback); ... // 一段时间后执行 callback(res);
- 官方实现的 MySQL 驱动采用的是阻塞 IO,底层使用的是同步连接,需要占用一个线程;为了不阻塞当前线程,那么就需要另启线程去等待返回结果,这样一来阻塞的是另起的线程,而不是当前线程,从而实现异步连接。
- 通过回调函数去处理返回结果,异步获取连接的返回。
同步连接池
- 当前线程从连接池(线程安全)中获取可用连接(未被锁定的连接)。
- 同步连接池的大小:最多允许几个线程同时使用连接。
- 应用:服务器启动时,初始化资源。
- 每一条连接都对应一把锁。
异步连接池
- 任意线程向连接池投递执行 SQL 语句的请求,连接池依次从队列中取出任务执行。
- 异步连接池的大小:最多允许几个连接同时执行 SQL 语句。
- 应用:服务器启动后,业务处理。
- 每一条连接都对应一个线程。
- 请求和回应如何对应 ?
MySQL 连接驱动使用
-
sudo apt install libmysqlclient-dev
-
加载
mysql.h
以及动态库或静态库。 -
初始化连接驱动
mysql_library_init
。 -
使用 MySQL 连接驱动与 MySQL 进行交互(执行 SQL 语句)。
-
释放连接资源
mysql_library_end
。 -
裸 SQL 语句:每一条 SQL 语句都需要进行词法句法分析、制定执行计划。
void Execute(char const* sql);
-
预处理 SQL 语句:只需要执行一次词法句法分析、制定执行计划,下次再执行相同的 SQL 语句时,会直接将执行计划放到存储引擎中执行,效率更高。
void Execute(PreparedStatement<T>* stmt);
-
同步连接池和异步连接池跟具体的数据库绑定。
接口封装
- 同步接口使用
DirectExecute
不需要结果。Query
需要结果。
- 异步接口使用
Execute
不需要结果。AsyncExecute
需要结果。DelayQueryHolder
异步执行多个 SQL 语句。AsyncCallbackProcessor.h
封装了异步获取结果的接口。AddCallback
保存callback
以及future
。- 请求线程循环检测是否收到数据库的返回,并进行处理(执行回调函数)(
ProcessReadyCallbacks
)
- 异步接口封装
chain
:责任链模式。delay(holder)
:pipline 模式,异步执行多个 SQL 语句。transaction
事务模式。
void ProcessReadyCallbacks()
{
if (_callbacks.empty())
return;
std::vector<T> updateCallbacks{ std::move(_callbacks) };
updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback)
{
return callback.InvokeIfReady();
}), updateCallbacks.end());
_callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
}
bool QueryCallback::InvokeIfReady()
{
QueryCallbackData& callback = _callbacks.front();
auto checkStateAndReturnCompletion = [this]()
{
_callbacks.pop();
bool hasNext = !_isPrepared ? _string.valid() : _prepared.valid();
if (_callbacks.empty())
{
ASSERT(!hasNext);
return true;
}
// abort chain
if (!hasNext)
return true;
ASSERT(_isPrepared == _callbacks.front()._isPrepared);
return false;
};
if (!_isPrepared)
{
if (_string.valid() && _string.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
QueryResultFuture f(std::move(_string));
std::function<void(QueryCallback&, QueryResult)> cb(std::move(callback._string));
cb(*this, f.get());
return checkStateAndReturnCompletion();
}
}
else
{
if (_prepared.valid() && _prepared.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
PreparedQueryResultFuture f(std::move(_prepared));
std::function<void(QueryCallback&, PreparedQueryResult)> cb(std::move(callback._prepared));
cb(*this, f.get());
return checkStateAndReturnCompletion();
}
}
return false;
}