概要:本文介绍mysql连接池的实现,要求读者了解线程池
一、为什么需要mysql连接池?
资源复用 :不使用连接池,每次数据库请求都新建一条连接,将耗费系 统资源。 流程如下:
- 通过三次握手建立 TCP 连接
- MySQL 认证
- SQL 执行
- 通过四次挥手断开 TCP 连接
更快的系统响应速度:
1.一次连接建立和销毁,可复用同一条连接多次执行 SQL 语句。
2.统一的连接管理,避免数据库连接泄露
二、mysql连接池运行原理
三、代码实现
1.结构体定义
typedef struct task_t {
struct task_t *next; // 指向下一个任务节点
int clientfd; // 客户端fd
char SQL[MAX_SQL_LENGTH]; // SQL语句缓冲区
} task_t;
typedef struct task_queue_t { // task队列
task_t *head; // 指向队列的第一个task节点
task_t *tail; // 指向队列的最后一个task节点
int block; // 阻塞标志
pthread_spinlock_t lock; // 自旋锁变量
pthread_mutex_t mutex; // 互斥锁变量
pthread_cond_t cond; // 条件变量
} task_queue_t;
typedef struct argc {
MYSQL *mysql;
task_queue_t *queue;
} argc;
2.资源创建
a.任务队列
task_queue_t *task_queue_create() { // 创建一个任务队列
int ret;
task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
if (queue) {
ret = pthread_mutex_init(&queue->mutex, NULL);
if (ret == 0) {
ret = pthread_cond_init(&queue->cond, NULL);
if (ret == 0) {
pthread_spin_init(&queue->lock, 0);
queue->head = NULL;
queue->tail = NULL;
queue->block = 1;
return queue;
}
}
free(queue);
}
return NULL;
}
b.mysql连接句柄
void mysql_conn_init(MYSQL* mysql) {
mysql_init(mysql); // 初始化mysql句柄
// 连接到MySQL数据库
mysql_real_connect(mysql, MYSQL_SERVER_IP, MYSQL_SERVER_USERNAME, MYSQL_SERVER_PASSWORD,
MYSQL_SERVER_DEFAULT_DB, MYSQL_SERVER_PORT, NULL, 0);
}
2.sql任务的添加、执行
a.push、pop
void add_task(task_queue_t *queue, task_t *task) { // 向任务队列中添加一个task
pthread_spin_lock(&queue->lock);
if (!queue->tail) {
queue->tail->next = task;
queue->tail = task;
}
else {
queue->head = task;
queue->tail = task;
}
pthread_spin_unlock(&queue->lock);
pthread_cond_signal(&queue->cond);
}
void *pop_task(task_queue_t *queue) { // 从任务队列中取出一个任务
pthread_spin_lock(&queue->lock);
if (queue->head == NULL) {
pthread_spin_unlock(&queue->lock);
return NULL;
}
// 取出队列中第一个任务
task_t *task;
task = queue->head;
queue->head = task->next;
//判断队列是否为空
if (queue->head == NULL) {
queue->tail = queue->head;
}
pthread_spin_unlock(&queue->lock);
return task;
}
task_t *get_task(task_queue_t *queue) { // 原子地从队列中取出一个任务
task_t *task;
while ((task = pop_task(queue)) == NULL) {
pthread_mutex_lock(&queue->mutex);
if (queue->block == 0) {
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
b.执行任务并将mysql服务器的回复信息转发给客户端
void *mysql_conn_thrd_worker(void *argc) {
task_t *task;
struct argc *arg = (struct argc*)argc;
task_queue_t *queue = arg->queue;
MYSQL *mysql = arg->mysql;
while (!destroy_pool) {
task = get_task(queue);
if(!task) break;
// 执行其中的SQL语句
mysql_real_query(mysql, task->SQL, strlen(task->SQL)); // 注入sql语句
MYSQL_RES *res = mysql_store_result(mysql); // 存储mysql返回信息
char response[64];
// 将mysql回复结果cpoy进response
if (res) {
MYSQL_ROW row;
row = mysql_fetch_row(res);
if (row) {
snprintf(response, sizeof(response), "%s", row[0]); // 假设结果为字符串类型,仅复制第一列数据
} else {
snprintf(response, sizeof(response), "No result found");
}
mysql_free_result(res); // 释放结果集
} else {
snprintf(response, sizeof(response), "Error retrieving result");
}
// 发送回复信息
send(task->clientfd, response, 64, 0);
// 销毁任务
free(task);
}
}
3.主线程接收客户端连接、sql请求
int tcp_server(task_queue_t *queue) {
// 初始化服务器套接字
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("create sockfd fail\n");
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(2024);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (-1 == bind(sockfd, (struct sockaddr*)&addr, sizeof(addr))) {
perror("bind fail\n");
return -2;
}
if (-1 == listen(sockfd, 5)) {
perror("listen fail\n");
return -3;
}
//IO多路复用
int epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
struct epoll_event events[1024] = {0};
while (1) {
int ret = epoll_wait(epfd, events, 1024, -1);
if (ret == -1) {
perror("epoll_wait fail");
break;
}
int i = 0;
for (i = 0; i < ret; i++) {
if (sockfd == events[i].data.fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
fcntl(clientfd, F_SETFL, SOCK_NONBLOCK);
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
} else if (events[i].events & EPOLLIN){
while (1) {
char buffer[256] = {0};
int count = recv(events[i].data.fd, buffer, 10, 0);
if (count < 0) {//读取完毕或当前没有数据可读或者出错
if( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {//读取完毕
printf("recv finished\n");
break;
}
//recv出错
close(events[i].data.fd);//关闭事件的套接字
break;
}
else if (count == 0) {//对方发送fin断开连接
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);//移除该事件
close(events[i].data.fd);//关闭事件的套接字
break;
}
else {//接收到数据
task_t *task; // 将clientfd和buffer包装进task
init_task(task); // 初始化task的next指针
strcpy(task->SQL, buffer); // 装入sql请求
task->clientfd = events[i].data.fd;
add_task(queue, task); // 将此task添加到任务队列
}
}
}
}
}
close(sockfd);
return 0;
}
4.main函数
int main() {
// 工作队列
task_queue_t *queue = task_queue_create();
if (!queue) exit(1);
// 创建MySQL连接
MYSQL mysqls[NUM_MYSQL_CONNECTION] = {0};
int i;
for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {
mysql_conn_init(&mysqls[i]);
}
// 创建工作线程
pthread_t threadid[NUM_MYSQL_CONNECTION];
for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {
struct argc *argc = (struct argc *)malloc(sizeof(struct argc));
argc->mysql = &mysqls[i];
argc->queue = queue;
pthread_create(&threadid[i], NULL, mysql_conn_thrd_worker, argc);
free(argc);
}
tcp_server(queue); // Tcp 服务器,接收客户端连接,包装SQL请求信息并添加到工作队列
return 0;
}
推荐学习 https://xxetb.xetslk.com/s/p5Ibb