【linux 多进程并发】0302 Linux下多进程模型的网络服务器架构设计,实时响应多客户端请求

0302 多进程网络服务器架构

专栏内容

  • postgresql使用入门基础
  • 手写数据库toadb
  • 并发编程

个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

一、概述


在大规模数据处理中,会有大量的客户端接入同一台服务器,每个客户端都需要长时间提供服务。

服务器采用中心化的部署,而客户端往往分散在不同机器上,服务器与客户端之间跨网络通信,一般采用C/S架构。

而服务端的架构需要能应对大量并发客户端,同时可以给每个客户端独占的服务,这就用到了多任务的网络模型架构,下面我们来看看用多进程如何实现。

二、多路复用的网络模型


C/S架构中,处理大量的网络请求,需要一套基于多路复用的网络处理模型。

  • 可以同时处理网络连接请求和网络数据传递;
  • 减少程序的阻塞时间,避免无效的CPU消耗;
  • 适应不同的并发规模;

以此为目标实现如下网络模型。

2.1 服务端网络监听

多路复用模型这里采用了epoll方式,如果自己的平台不支持,可以换为select或者poll的方式。

在这里插入图片描述

代码如下:

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
#include <sys/epoll.h>  
#include <fcntl.h>  
#include <errno.h>  
  
#define MAX_EVENTS 10  
#define BUFFER_SIZE 1024  
#define PORT 8080  
  
// 设置文件描述符为非阻塞模式  
int set_nonblocking(int fd) {  
    int flags, s;  
  
    flags = fcntl(fd, F_GETFL, 0);  
    if (flags == -1) {  
        perror("fcntl F_GETFL");  
        return -1;  
    }  
  
    flags |= O_NONBLOCK;  
    s = fcntl(fd, F_SETFL, flags);  
    if (s == -1) {  
        perror("fcntl F_SETFL");  
        return -1;  
    }  
  
    return 0;  
}  
  
int main() {  
    int listen_fd, conn_fd, nfds, epoll_fd;  
    struct sockaddr_in server_addr;  
    struct epoll_event ev, events[MAX_EVENTS];  
    char buffer[BUFFER_SIZE];  
    ssize_t count;  
  
    // 创建监听socket  
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);  
    if (listen_fd == -1) {  
        perror("socket");  
        exit(EXIT_FAILURE);  
    }  
  
    // 设置非阻塞模式  
    if (set_nonblocking(listen_fd) == -1) {  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 绑定地址和端口  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_addr.s_addr = INADDR_ANY;  
    server_addr.sin_port = htons(PORT);  
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {  
        perror("bind");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 开始监听  
    if (listen(listen_fd, SOMAXCONN) == -1) {  
        perror("listen");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 创建epoll实例  
    epoll_fd = epoll_create1(0);  
    if (epoll_fd == -1) {  
        perror("epoll_create1");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 添加监听socket到epoll实例  
    ev.events = EPOLLIN;  
    ev.data.fd = listen_fd;  
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {  
        perror("epoll_ctl: listen_fd");  
        close(listen_fd);  
        close(epoll_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 主循环  
    while (1) {  
        nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);  
        if (nfds == -1) {  
            perror("epoll_wait");  
            exit(EXIT_FAILURE);  
        }  
  
        for (int n = 0; n < nfds; ++n) {  
            if (events[n].data.fd == listen_fd) {  
                // 新的连接  
                conn_fd = accept(listen_fd, NULL, NULL);  
                if (conn_fd == -1) {  
                    perror("accept");  
                    continue;  
                }  
  
                // 设置非阻塞模式  
                if (set_nonblocking(conn_fd) == -1) {  
                    close(conn_fd);  
                    continue;  
                }  
  
                // 添加新的连接socket到epoll实例  
                ev.events = EPOLLIN | EPOLLET;  
                ev.data.fd = conn_fd;  
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {  
                    perror("epoll_ctl: conn_fd");  
                    close(conn_fd);  
                }  
            } else {  
                // 处理读事件  
                conn_fd = events[n].data.fd;  
                while ((count = read(conn_fd, buffer, BUFFER_SIZE)) > 0) {  
                    // 处理接收到的数据(这里简单回显)  
                    write(conn_fd, buffer, count);  
                }  
  
                if (count == -1 && errno != EAGAIN) {  
                    // 出现错误或连接关闭  
                    close(conn_fd);  
                } else if (count == 0) {  
                    // 连接关闭  
                    close(conn_fd);  
                }  
  
                // 从epoll实例中移除已关闭的socket  
                if (count <= 0 && errno != EAGAIN) {  
                    ev.events = 0;  
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn_fd, &ev);  
                }
            }  
        }  
    }  
  
    close(listen_fd);  
    close(epoll_fd);  
    return 0;  
}

说明

  • TCP服务端的基本步骤创建socket,设置为非阻塞模式,绑定IP与端口,开启监听;
  • 这里服务端的socket需要设置为非阻塞模式,因为我们是在单进程中处理多个连接,每个连接不能阻塞等待;
  • 然后加入到epoll监听池中,开始epoll事件的等待;这里只处理接收事件;
  • 如果有服务端socket的接收事件,那么说明有客户端连接消息,进行accep,创建客户端连接的socket;
  • 同样将客户端连接的socket设置为非阻塞,理由同上;加入epoll临听池中,同样也只处理接收事件;
  • 如果有客户端连接的socket上的接收事件,那么说明客户端正在给服务端发消息;
  • 收到客户端消息后,这里只是简单处理,原样再发给客户端;
  • 如果客户端关闭或出错,将客户端连接关闭,并从epoll临听池中移除;

2.2 客户端测试

现在我们来编写一个简单的客户端模拟程序,测试一下多路复用的网络框架。

/*
 * ex020302_client.c
 */

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
  
#define SERVER_IP "127.0.0.1"  
#define SERVER_PORT 4808  
#define BUFFER_SIZE 1024  

#define CLIENT_SEND_CNT 20

int main() 
{  
    int sockfd;  
    struct sockaddr_in server_addr;  
    char buffer[BUFFER_SIZE] = {0};  
    const char *message = "Hello, Server!";  
  
    // 创建套接字  
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    {  
        perror("socket creation failed");  
        exit(EXIT_FAILURE);  
    }  
  
    // 配置服务器地址信息  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_port = htons(SERVER_PORT);  
  
    // 将IP地址从字符串转换为二进制形式  
    if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) 
    {  
        perror("Invalid address/ Address not supported");  
        close(sockfd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 连接到服务器  
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) 
    {  
        perror("Connection Failed");  
        close(sockfd);  
        exit(EXIT_FAILURE);  
    }  
  
    for(int i = 0; i < CLIENT_SEND_CNT; i++)
    {
        // 发送消息到服务器  
        send(sockfd, message, strlen(message), 0);  
        printf("Message sent: %s\n", message);  
    
        // 接收服务器的响应  
        int bytes_received = recv(sockfd, buffer, BUFFER_SIZE - 1, 0);  
        if (bytes_received < 0) 
        {  
            perror("Error in receiving");  
        } 
        else if (bytes_received == 0) 
        {  
            printf("Server closed the connection\n");  
        } 
        else 
        {  
            buffer[bytes_received] = '\0'; // 确保字符串以空字符结尾  
            printf("Message received from server: %s\n", buffer);  
        }  

        sleep(1);
    }
  
    // 关闭套接字  
    close(sockfd);  
  
    return 0;  
}

说明

  • TCP客户端建立的基本步骤,创建socket,初始化服务端地址,连接服务器;
  • 然后向服务端发送相同的消息;
  • 每次发送完成后,等待接收消息;

2.3 客户端测试

可以看到,服务端处理客户端的请求时,都是按照接收到的顺序进行串行处理;

当客户端的数量达到成百上千时,对客户端的响应时间就会出现非常明显的延迟,

这种延迟会随着业务的复杂度而放大。

这时就需要充分利用多核CPU硬件资源,来进行并发任务的处理。

三、多进程服务处理


上面是在单个任务进程中处理了监听和大量任务连接的网络处理,各客户端连接的服务会相互影响,实际是串行化处理的。

要让大量的客户端能同时被响应,需要采用多任务的方式,那么在上面的网络模型基础上加入多进程,服务端为每个客户端连接准备一个独立的进程,这样就可以及时响应。

3.1 多进程架构

首先我们利用前面几个章节的介绍,来搭建一个多进程的代码架构,由主进程根据需要进行创建子进程,并且由主进程进行全局的控制。

在这里插入图片描述

/*
 * ex020302_netprocess.c
 */
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
#include <sys/epoll.h>  
#include <fcntl.h>  
#include <errno.h>  
  
#define MAX_EVENTS 10  
#define BUFFER_SIZE 1024  
#define PORT 4808  

void daemon_fork()
{
	int pid = -1;
	pid = fork();
	if(pid < 0)
	{
		printf("fork error[%s]\n",strerror(errno));
		exit(-1);
	}
	else if(pid > 0)
	{
		// parent exit.
		exit(0);
	}
	else 
	{
		// child daemon
		return;
	}
}

void subprocess(int sock)
{
    int pid = -1;
	pid = fork();
	if(pid < 0)
	{
		printf("fork error[%s]\n",strerror(errno));
		exit(-1);
	}
	else if(pid > 0)
	{
		// parent.
        close(sock);
		return;
	}
	else 
	{
		// child 
        close(listen_fd);
        processMsg(sock);
		exit(0);
	}
}

说明

  • daemon服务程序函数,这个前一章节已经介绍过了,服务端以后台进程的方式运行;
  • 子进程任务处理函数;这里创建的是任务子进程,并在子进程中调用消息处理函数;
  • 这里需要注意的是,在子进程中要关闭服务端的socket,同时在父进程中要关闭客户端连接的socket; 因为父子进程会复制内存空间,但是在各自的进程中,已经不再需要;

3.2 并发网络处理模型

现在就可以将上面的多路复用网络处理放入多进程架构中,处理逻辑进行如下切分:

  • 服务端监听socket初始化,多路复用器的初始化等,都放在主进程中,作为服务端网络初始化的一部分;
  • 每个客户端连接的socket,以及它的读写消息处理逻辑,放在子进程中;这样每个客户端连接对应一个后台服务子进程;
  • 创建子进程的时机,也就是在主进程中接收到新连接时,创建新连接成功后,就可以新建子进程进行处理;
  • 而子进程的退出时间,就是客户端断开连接,或者处理出错时;
void initializeServerNet()
{
    struct sockaddr_in server_addr;  
  
    // 创建监听socket  
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);  
    if (listen_fd == -1) 
    {  
        perror("socket");  
        exit(EXIT_FAILURE);  
    }  
   
    // 绑定地址和端口  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_addr.s_addr = INADDR_ANY;  
    server_addr.sin_port = htons(PORT);  
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) 
    {  
        perror("bind");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }

    // 开始监听
    if (listen(listen_fd, SOMAXCONN) == -1)
    {
        perror("listen");
        close(listen_fd);
        exit(EXIT_FAILURE);
    }
}

void closeServerFd()
{
    close(listen_fd);  
}

void dispatchLoop()
{
    int conn_fd;  

    // 主循环  
    while (1) 
    {  
        // 新的连接  
        conn_fd = accept(listen_fd, NULL, NULL);  
        if (conn_fd == -1) 
        {  
            perror("accept");  
            sleep(1);
            continue;  
        }

        subprocess(conn_fd);
    }  
}

void processMsg(int sock)
{
    char buffer[BUFFER_SIZE];  
    ssize_t count;  

    printf("serv-process:%d start.\n");

    while ((count = read(sock, buffer, BUFFER_SIZE)) > 0)
    {
        // 处理接收到的数据(这里简单回显)
        write(sock, buffer, count);
    }

    if (count == -1 && errno != EAGAIN)
    {
        // 出现错误或连接关闭
        close(sock);
    }
    else if (count == 0)
    {
        // 连接关闭
        close(sock);
    }

    printf("serv-process:%d exit.\n");
}

那么主程序实现如下:

void daemon_fork();
void subprocess(int sock);
void processMsg(int sock);

void initializeServerNet();
void closeServerFd();
void dispatchLoop();

int listen_fd;

int main(int argc ,char *argv[])
{
	daemon_fork();

	initializeServerNet();
    
  dispatchLoop();

  closeServerFd();
	return 0;
}
  • 在主进程中先进程服务端初始化;
  • 然后就可以开始监听,并接收客户端的连接;
  • 当有客户端连接时,就创建客户端连接,并启动子进程与该客户端进行网络通信;
  • 子进程在客户端断开连接或出错时,就会退出;

2.3 客户端测试

可以看到将客户端发送次数调大后,开启的客户端越多,服务端启动的子进程也就会越多;

此时,可以看到服务端每个进程的CPU使用率并不是很高;

但是随着客户端数量越来越多,服务端进程数量超过CPU核数时,就会增加系统的负担;

四、总结


本文主要介绍了基于多进程架构的网络服务器的设计与实现,在多进程架构中每个客户端会有一个服务端的进程专门处理通信,增加了对客户端消息的响应效率,提升了并发处理能力。

结尾


非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。

注:未经同意,不得转载!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/907251.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux云计算 |【第五阶段】CLOUD-DAY9

主要内容&#xff1a; Metrics资源利用率监控、存储卷管理&#xff08;临时卷ConfitMap、EmptyDir、持久卷HostPath、NFS(PV/PVC)&#xff09; 一、Metrics介绍 metrics是一个监控系统资源使用的插件&#xff0c;可以监控Node节点上的CPU、内存的使用率&#xff0c;或Pod对资…

Kafka 判断一个节点是否还活着有那两个条件?

大家好&#xff0c;我是锋哥。今天分享关于【Kafka 判断一个节点是否还活着有那两个条件&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; Kafka 判断一个节点是否还活着有那两个条件&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在Ka…

PyQt5实战——多脚本集合包,UI以及工程布局(二)

个人博客&#xff1a;苏三有春的博客 系列往期&#xff1a; PyQt5实战——多脚本集合包&#xff0c;前言与环境配置&#xff08;一&#xff09; 布局 2.1 UI页面布局 整体框架分为分为三个部分&#xff0c;垂直分布。 第一个部分为功能选择按钮&#xff08;如UTF-8转换&#…

Cpp::set map 的理解与使用(22)

文章目录 前言一、预备知识关联式容器键值对 二、set何为set?set的使用set的特点multiset 三、map何为map?map中的operator[ ]multimap 总结 前言 刚学完二叉搜索树&#xff0c;我们马上来感受一下直接与它相关的两个容器吧&#xff01; 一、预备知识 关联式容器 在以往的 S…

PostgreSQL 学习笔记:PostgreSQL 主从复制

PostgreSQL 笔记&#xff1a;PostgreSQL 主从复制 博客地址&#xff1a;TMDOG 的博客 在现代应用程序中&#xff0c;数据库的高可用性和扩展性是至关重要的。PostgreSQL 提供了主从复制功能&#xff0c;可以在多个数据库实例之间复制数据&#xff0c;以实现冗余和负载均衡。本…

SQL,力扣题目1225,报告系统状态的连续日期【窗口函数】

一、力扣链接 LeetCode_1225 二、题目描述 表&#xff1a;Failed ----------------------- | Column Name | Type | ----------------------- | fail_date | date | ----------------------- 该表主键为 fail_date (具有唯一值的列)。 该表包含失败任务的天数.表…

晶台施密特触发器光耦KLH11LX,1MHz高传输速率

晶台推出KLH11LX系列由一个砷化镓红外发光二极管和一个高速集成电路检测器组成&#xff0c;该输出检测器包含了一个施密特触发器&#xff0c;利用其回滞特性&#xff0c;便于脉冲整形&#xff0c;提高抗噪性能。 功能图Functional Diagram 产品特点Product Features •高传输…

Mac在Typora配置PicGo图床,以github为例

Mac配置PicGo图床 0.准备阶段&#xff1a;下载PicGo https://picgo.github.io/PicGo-Doc/zh/guide/ 根据这个链接选择自己的安装方式 1.PicGo已损坏&#xff0c;无法打开 解决方法 打开iTerm,把sudo xattr -d com.apple.quarantine 输入命令行 然后把软件拖入命令行 sudo xa…

「Mac畅玩鸿蒙与硬件23」鸿蒙UI组件篇13 - 自定义组件的创建与使用

自定义组件可以帮助开发者实现复用性强、逻辑清晰的界面模块。通过自定义组件,鸿蒙应用能够提高代码的可维护性,并简化复杂布局的构建。本篇将介绍如何创建自定义组件,如何向组件传递数据,以及如何在不同页面间复用这些组件。 关键词 自定义组件复用组件属性传递组件通信组…

redis模板的应用:自定义redisTemplate序列化规则 (RedisTemplate和StringRedisTemplate)

文章目录 引言I 基础知识redis对key和value使用序列化方式RedisTemplate<Object, Object>自定义redisTemplate序列化规则RedisTemplate<String, String>II 存储自定义对象redisTemplate存储自定义对象StringRedisTemplate存储自定义对象引言 StringRedisTemplate只…

二叉苹果树

AcWing 1074. 二叉苹果树【有依赖背包DP】 - AcWing 问题描述 在一棵有权无向树中&#xff0c;从某个节点&#xff08;这里假设为节点 1&#xff09;出发&#xff0c;遍历树的子节点&#xff0c;每经过一条边会获得对应的权重值。在访问节点数的限制下&#xff08;即体积限制…

Linux基础命令(八) 之 alias ,history,stat,type,特殊符号及命令行快捷键

目录 一&#xff0c;命令别名 alias 常见用法 二&#xff0c;命令历史 history 参数及其作用 常见用法 三.显示文件或文件系统的详细信息 stat 参数及其作用 常见用法 四&#xff0c;显示命令的类型 type 参数及其作用 常见用法 五&#xff0c;特殊符号及命令行快捷…

省级-知识产权保护指数(2012-2022年)

省级知识产权保护指数&#xff08;以下简称“指数”&#xff09;正是衡量各省份在知识产权保护方面表现的一个综合指标&#xff0c;它涵盖了立法、执法、审查和监督等多个维度&#xff0c;全面反映了各省份在知识产权保护方面的综合实力。 2012年-2022年省级-知识产权保护指数…

GraphQL 与 Elasticsearch 相遇:使用 Hasura DDN 构建可扩展、支持 AI 的应用程序

作者&#xff1a;来自 Elastic Praveen Durairaju GraphQL 提供了一种高效且灵活的数据查询方式。本博客将解释 Hasura DDN 如何与 Elasticsearch 配合使用&#xff0c;以实现高性能和元数据驱动的数据访问。 此示例的代码和设置可在此 GitHub 存储库 - elasticsearch-subgraph…

filebeat+elasticsearch+kibana日志分析

1 默认配置 1.1 filebeat filebeat-7.17.yml,从网关中下载k8s的配置&#xff0c;指定es和kibana的配置 通过kibana查询可以查询到日志了&#xff0c;但此时还不知道具体怎么用。 1.2 kibana 在Discover中创建索引格式&#xff1a;filebeat-*&#xff0c;得到如下图&#xf…

Rust 力扣 - 2090. 半径为 k 的子数组平均值

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 半径为 k 的子数组平均值 等价于 子数组长度为2 * k 1的总和 除于 2 * k 1 我们遍历长度为2 * k 1的窗口&#xff0c;我们只需要记录窗口内的平均值即可 题解代码 impl Solution {pub fn get_averages(num…

uniapp的video视频属性打包app后层级过高

问题&#xff1a;在使用uniapp开发APP时&#xff0c;使用video标签显示视频发现H5可以正常展示&#xff0c;但是打包到APP后&#xff0c;它的层级过高&#xff0c;把底部导航都盖住了。 官网说明&#xff1a;uni-app官网 官网给了cover-view组件或plus.nativeObj.view、subNVue…

浅谈UI自动化

⭐️前言⭐️ 本篇文章围绕UI自动化来展开&#xff0c;主要内容包括什么是UI自动化&#xff0c;常用的UI自动化框架&#xff0c;UI自动化原理等。 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f349;博主将持续更新学习记录收获&#xff0c;友友们有任何问题…

Vue3+Data-V实现可视化大屏页面布局

目录 一、前言 二、环境准备 1.Vue3安装npm create vuelatest 2.Data-V配置 项目Data-v安装 main.js中注册Data-v到全局 ​编辑可使用按需引入 3.测试 三、导航栏路由跳转配置 1.子组件mainNav组件准备 2.父组件准备导航栏参数传递 3.子组件接收父组件参数 4.导航…

Python 使用 LSTM 进行情感分析:处理文本序列数据的指南

使用 LSTM 进行情感分析&#xff1a;处理文本序列数据的指南 长短期记忆网络&#xff08;LSTM&#xff09;是一种适合处理序列数据的深度学习模型&#xff0c;广泛应用于情感分析、语音识别、文本生成等领域。它通过在训练过程中“记住”过去的数据特征来理解和预测序列数据的…