Linux编程:基于 Unix Domain Socket 的进程/线程间通信实时性优化

文章目录

    • 0. 引言
    • 1. 使用 `epoll` 边缘触发模式
      • 非不要不选择阻塞模式
      • 边缘触发(ET)模式
      • 优点
      • 示例
    • 2. 使用实时调度策略
    • 3. CPU 绑定
    • 4. 使用无锁缓冲区
    • 5. 优化消息传递的大小和频率
    • 6. 使用 `SO_RCVTIMEO` 和 `SO_SNDTIMEO`
    • 7. 示例代码
    • 其他阅读

0. 引言

前几天被问到“如何优化Linux中Domain Socket的线程间通信实时性?”当时的回答感觉不够好,经过仔细思考后,我整理出以下优化策略,考虑的是高并发和低延迟场景中的应用优化。

1. 使用 epoll 边缘触发模式

非不要不选择阻塞模式

阻塞式 read() 在单客户端的情况下,能够立即响应数据的到达,但其局限性在于:

  • 无法同时处理多个 I/O 操作。如果同时需要接收和发送数据,阻塞式 read() 会在读取数据时阻塞当前线程,直到数据可用,这使得线程无法在等待数据时执行其他任务(例如发送数据)。 也就是处理双向通信不够高效。
  • 阻塞导致线程空闲。即使线程处于阻塞状态,系统仍需要为其调度,但线程无法做任何实际工作。这样会浪费 CPU 时间,降低系统的响应性和资源利用率。

边缘触发(ET)模式

epoll边缘触发 模式(ET)在文件描述符的状态发生变化时仅触发一次事件。当状态从“不可读”变为“可读”时,epoll 只会通知一次,后续不会触发事件直到状态再次变化。这减少了重复触发事件的系统调用,降低了上下文切换的频率。

优点

  • 减少系统调用和上下文切换:边缘触发模式比水平触发模式(LT)减少了不必要的系统调用。
  • 更低延迟:每个事件只触发一次,避免了多次触发导致的等待时间。
  • 更高效率:配合非阻塞 I/O 使用,避免了重复的事件通知。

示例

struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 设置为边缘触发模式
ev.data.fd = sockfd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
    perror("epoll_ctl");
    exit(EXIT_FAILURE);
}

2. 使用实时调度策略

Linux 提供了 SCHED_FIFOSCHED_RR 等实时调度策略,可以降低调度延迟。通过 sched_setscheduler() 函数设置线程调度策略,有助于提升线程的响应速度。

struct sched_param param;
param.sched_priority = 99;  // 设置较高的优先级
sched_setscheduler(pid, SCHED_FIFO, &param);  // 设置实时调度策略

3. CPU 绑定

将线程绑定到特定的 CPU 核,减少跨核调度和缓存失效,降低延迟。

cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);  // 将线程绑定到指定的 CPU 核
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);

4. 使用无锁缓冲区

使用无锁缓冲区可以减少CPU时间片切换次数:

  • 无锁队列:使用原子操作管理数据结构,避免传统锁机制的性能瓶颈,减少线程同步的开销。

实现请见:C++生产者-消费者无锁缓冲区的简单实现

5. 优化消息传递的大小和频率

每次发送或接收的数据大小直接影响通信延迟。频繁的小数据传输会增加 I/O 操作次数,导致延迟增加。优化措施包括:

  • 批量传输:将多个小消息合并为一个大消息,减少系统调用次数和上下文切换频率。
  • 调整缓冲区大小:根据应用需求调整套接字的发送和接收缓冲区大小,以避免缓冲区过小导致频繁的上下文切换。
int bufsize = 8192;  // 请根据实际设置合适的缓冲区大小
setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));

6. 使用 SO_RCVTIMEOSO_SNDTIMEO

SO_RCVTIMEOSO_SNDTIMEO 是用来防止套接字在接收或发送数据时无限期阻塞的选项。当设置了这些超时选项后,套接字在等待数据时会在超时后返回错误(如 EAGAINEWOULDBLOCK),从而提高应用程序的响应性。然而,这些选项不能直接解决由于 CPU 调度延迟引起的实时性问题。它们的作用仅仅是在指定时间内没有完成操作时返回错误,而不是保证操作在一定时间内完成。

// 设置接收超时时间
struct timeval recv_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)) == -1) {
    perror("setsockopt SO_RCVTIMEO");
    close(sock);
    exit(EXIT_FAILURE);
}

// 设置发送超时时间
struct timeval send_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)) == -1) {
    perror("setsockopt SO_SNDTIMEO");
    close(sock);
    exit(EXIT_FAILURE);
}

7. 示例代码

// g++ -o uds_server uds_server.cpp -pthread
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/un.h>
#include <cstring>
#include <cerrno>
#include <atomic>
#include <pthread.h>
#include <sched.h>

#define SOCKET_PATH "/tmp/uds_socket"
#define MAX_EVENTS 10
#define BUF_SIZE 1024
#define SOCKET_BACKLOG 5

// 无锁环形缓冲区
class LockFreeBytesBuffer {
public:
    static const std::size_t kBufferSize = 10240U;  // 缓冲区大小

    LockFreeBytesBuffer() noexcept : readerIndex_(0U), writerIndex_(0U) {
        std::memset(buffer_, 0, kBufferSize);
    }

    bool append(const char* data, std::size_t length) noexcept;
    std::size_t beginRead(const char** target) noexcept;
    void endRead(std::size_t length) noexcept;

private:
    char buffer_[kBufferSize];
    std::atomic<std::size_t> readerIndex_;
    std::atomic<std::size_t> writerIndex_;
};

bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {
    const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_relaxed);
    const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_acquire);

    const std::size_t freeSpace = (currentReadIndex + kBufferSize - currentWriteIndex - 1U) % kBufferSize;
    if (length > freeSpace) {
        return false;  // 缓冲区满
    }

    const std::size_t pos = currentWriteIndex % kBufferSize;
    const std::size_t firstPart = std::min(length, kBufferSize - pos);
    std::memcpy(&buffer_[pos], data, firstPart);
    std::memcpy(&buffer_[0], data + firstPart, length - firstPart);

    writerIndex_.store(currentWriteIndex + length, std::memory_order_release);
    return true;
}

std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {
    const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);
    const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_acquire);

    const std::size_t availableData = (currentWriteIndex - currentReadIndex) % kBufferSize;
    if (availableData == 0U) {
        return 0U;  // 缓冲区空
    }

    const std::size_t pos = currentReadIndex % kBufferSize;
    *target = &buffer_[pos];
    return std::min(availableData, kBufferSize - pos);
}

void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {
    const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);
    readerIndex_.store(currentReadIndex + length, std::memory_order_release);
}

// 设置套接字为非阻塞
int setSocketNonBlocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    if (flags == -1) {
        fprintf(stderr, "Error getting socket flags: %s\n", strerror(errno));
        return -1;
    }

    if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
        fprintf(stderr, "Error setting socket to non-blocking: %s\n", strerror(errno));
        return -1;
    }

    return 0;
}

// 设置实时调度策略
void setRealTimeScheduling() {
    struct sched_param param;
    param.sched_priority = 99;  // 设置较高的优先级
    if (sched_setscheduler(0, SCHED_FIFO, &param) == -1) {
        fprintf(stderr, "Error setting real-time scheduler: %s\n", strerror(errno));
        exit(EXIT_FAILURE);
    }
}

// 绑定线程到指定 CPU
void setThreadAffinity(int cpuId) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpuId, &cpuset);
    if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
        fprintf(stderr, "Error setting thread affinity: %s\n", strerror(errno));
        exit(EXIT_FAILURE);
    }
}

// 处理新连接
void handleNewConnection(int epollFd, int sockfd) {
    struct epoll_event ev;
    int connfd = accept(sockfd, nullptr, nullptr);
    if (connfd == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return;
        }
        fprintf(stderr, "Error accepting connection: %s\n", strerror(errno));
        return;
    }

    if (setSocketNonBlocking(connfd) == -1) {
        close(connfd);
        return;
    }

    ev.events = EPOLLIN | EPOLLET;  // 设置为边缘触发模式
    ev.data.fd = connfd;
    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, connfd, &ev) == -1) {
        fprintf(stderr, "Error adding connection to epoll: %s\n", strerror(errno));
        close(connfd);
    }
}

// 处理读取数据
void handleRead(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {
    char buf[BUF_SIZE];
    ssize_t nread = read(event.data.fd, buf, sizeof(buf));
    if (nread == -1) {
        if (errno != EAGAIN) {
            fprintf(stderr, "Error reading data: %s\n", strerror(errno));
            epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
            close(event.data.fd);
        }
    } else if (nread == 0) {
        epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
        close(event.data.fd);  // 连接关闭
    } else {
        fprintf(stdout, "Received data: %.*s\n", static_cast<int>(nread), buf);
        if (!buffer.append(buf, nread)) {
            fprintf(stderr, "Error appending to buffer: Buffer overflow!\n");
        }
    }
}

// 处理写操作
void handleWrite(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {
    const char* data;
    std::size_t len = buffer.beginRead(&data);
    if (len > 0) {
        ssize_t nwrite = write(event.data.fd, data, len);
        if (nwrite == -1) {
            if (errno != EAGAIN) {
                fprintf(stderr, "Error writing data: %s\n", strerror(errno));
                epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
                close(event.data.fd);
            }
        } else {
            buffer.endRead(nwrite);
        }
    }
}

// 主函数
int main() {
    int sockfd, epollFd;
    struct sockaddr_un addr;
    struct epoll_event ev, events[MAX_EVENTS];

    // 设置实时调度
    setRealTimeScheduling();
    // 设置线程亲和性
    setThreadAffinity(0);  // 绑定到 CPU 0

    // 创建 Unix Domain Socket
    sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (sockfd == -1) {
        fprintf(stderr, "Error creating socket: %s\n", strerror(errno));
        return EXIT_FAILURE;
    }

    // 设置套接字为非阻塞
    if (setSocketNonBlocking(sockfd) == -1) {
        close(sockfd);
        return EXIT_FAILURE;
    }

    // 绑定套接字到文件路径
    std::memset(&addr, 0, sizeof(struct sockaddr_un));
    addr.sun_family = AF_UNIX;
    std::strcpy(addr.sun_path, SOCKET_PATH);
    unlink(SOCKET_PATH);

    if (bind(sockfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {
        fprintf(stderr, "Error binding socket: %s\n", strerror(errno));
        close(sockfd);
        return EXIT_FAILURE;
    }

    // 监听连接请求
    if (listen(sockfd, SOCKET_BACKLOG) == -1) {
        fprintf(stderr, "Error listening on socket: %s\n", strerror(errno));
        close(sockfd);
        return EXIT_FAILURE;
    }

    // 创建 epoll 实例
    epollFd = epoll_create1(0);
    if (epollFd == -1) {
        fprintf(stderr, "Error creating epoll instance: %s\n", strerror(errno));
        close(sockfd);
        return EXIT_FAILURE;
    }

    // 将服务器套接字加入 epoll
    ev.events = EPOLLIN | EPOLLET;  // 边缘触发模式
    ev.data.fd = sockfd;
    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
        fprintf(stderr, "Error adding socket to epoll: %s\n", strerror(errno));
        close(sockfd);
        close(epollFd);
        return EXIT_FAILURE;
    }

    LockFreeBytesBuffer buffer;

    // 主循环,等待并处理事件
    while (true) {
        int n = epoll_wait(epollFd, events, MAX_EVENTS, -1);
        if (n == -1) {
            fprintf(stderr, "Error in epoll_wait: %s\n", strerror(errno));
            break;
        }

        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == sockfd) {
                // 处理新连接
                handleNewConnection(epollFd, sockfd);
            } else if (events[i].events & EPOLLIN) {
                // 处理读取数据
                handleRead(epollFd, events[i], buffer);
            } else if (events[i].events & EPOLLOUT) {
                // 处理写操作
                handleWrite(epollFd, events[i], buffer);
            }
        }
    }

    close(epollFd);
    close(sockfd);
    return EXIT_SUCCESS;
}

这个程序监听 Unix 域套接字 /tmp/uds_socket,能够处理多个客户端的连接,并异步地读取和写入数据:

  • 监听和接受连接:服务器首先通过 bindlisten 绑定套接字,然后通过 accept 等待来自客户端的连接。
  • 异步 I/O 事件处理:使用 epoll 来监听并处理事件(如接收数据、发送数据、错误等)。
  • epoll边缘触发:通过设置非阻塞 I/O 和边缘触发模式,程序能够高效地处理大量并发连接。
  • 缓冲区管理:使用环形缓冲区管理接收的数据。

其他阅读

  • 非Domain Socket的优化请参考:Linux编程:嵌入式ARM平台Linux网络实时性能优化
  • Linux 编程:高实时性场景下的内核线程调度与网络包发送优化
  • Linux I/O编程:I/O多路复用与异步 I/O对比

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915382.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

搭建Python2和Python3虚拟环境

搭建Python3虚拟环境 1. 更新pip2. 搭建Python3虚拟环境第一步&#xff1a;安装python虚拟化工具第二步&#xff1a; 创建虚拟环境 3. 搭建Python2虚拟环境第一步&#xff1a;安装虚拟环境模块第二步&#xff1a;创建虚拟环境 4. workon命令管理虚拟机第一步&#xff1a;安装扩…

C语言的内存函数(文章后附gitee链接,模拟实现函数)

之前我们已经讲解过了字符型数据的一类字符串函数&#xff0c; 现在我们来讨论字符型以外的数据处理。 1&#xff1a;memcpy 的使用和模拟实现 void * memcpy ( void * destination, const void * source, size_t num )&#xff1b; 注意&#xff1a; 1&#xff1a;函数memcp…

FPGA/Verilog,Quartus环境下if-else语句和case语句RT视图对比/学习记录

基本概念 RTL&#xff08;Register - Transfer - Level&#xff09;视图&#xff1a;是一种硬件描述语言的抽象层次&#xff0c;用于描述数字电路中寄存器之间的数据传输和操作。在这个层次上&#xff0c;可以看到电路的基本结构&#xff0c;如寄存器、组合逻辑、多路复用器等…

react的创建与书写

一&#xff1a;创建项目 超全面详细一条龙教程&#xff01;从零搭建React项目全家桶&#xff08;上篇&#xff09; - 知乎 1.创建一个文件夹&#xff0c;shift鼠标右键选择在此处打开powershell 2.为了加速npm下载速度&#xff0c;先把npm设置为淘宝镜像地址。 npm config s…

【动态规划】两个数组的 dp 问题

1. 最长公共子序列 1143. 最长公共子序列 状态表示&#xff1a; dp[i][j] 表示 s1 的 0 ~ i 区间和 s2 的 0 ~ j 区间内所有子序列中&#xff0c;最长公共子序列的长度 状态转移方程&#xff1a; 当 s1[i] 和 s2[j] 相等时&#xff0c;那么最长公共子序列一定是以这两个位置…

【计算机网络】【传输层】【习题】

计算机网络-传输层-习题 文章目录 10. 图 5-29 给出了 TCP 连接建立的三次握手与连接释放的四次握手过程。根据 TCP 协议的工作原理&#xff0c;请填写图 5-29 中 ①~⑧ 位置的序号值。答案技巧 注&#xff1a;本文基于《计算机网络》&#xff08;第5版&#xff09;吴功宜、吴英…

nacos集群部署与配置

Nacos集群模式 1. 预备环境准备 请确保是在环境中安装使用: 64 bit OS Linux/Unix/Mac&#xff0c;推荐使用Linux系统。64 bit JDK 1.8&#xff1b;下载. 配置。Maven 3.2.x&#xff1b;下载. 配置。3个或3个以上Nacos节点才能构成集群 ubuntu中假如没安装jdk&#xff0c;则…

Python学习从0到1 day26 第三阶段 Spark ③ 数据计算 Ⅱ

目录 一、Filter方法 功能 语法 代码 总结 filter算子 二、distinct方法 功能 语法 代码 总结 distinct算子 三、SortBy方法 功能 语法 代码 总结 sortBy算子 四、数据计算练习 需求&#xff1a; 解答 总结 去重函数&#xff1a; 过滤函数&#xff1a; 转换函数&#xff1a; 排…

Jmeter基础篇(23)TPS和QPS的异同

前言 这是一篇性能测试指标的科普文章哦&#xff01; TPS和QPS是同一个概念吗&#xff1f; TPS&#xff08;Transactions Per Second&#xff09;和QPS&#xff08;Queries Per Second&#xff09;虽然都是衡量系统性能的指标&#xff0c;但是它们并不是同一个概念。这两个各…

IEC60870-5-104 协议源码架构详细分析

IEC60870-5-104 协议源码架构 前言一、资源三、目录层级一二、目录层级二config/lib60870_config.hdependencies/READMEexamplesCMakeLists.txtcs101_master_balancedcs104_client_asyncmulti_client_servertls_clienttls_server说明 make这些文件的作用是否需要导入这些文件&a…

机器学习在网络安全中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 机器学习在网络安全中的应用 机器学习在网络安全中的应用 机器学习在网络安全中的应用 引言 机器学习概述 定义与原理 发展历程 …

JUC基础类-AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 1、AbstractQueuedSynchronizer概述2、AbstractQueuedSynchronizer源码分析2.1 AQS源码2.2 Node类 如有侵权&#xff0c;请联系&#xff5e; 如有问题&#xff0c;也欢迎批评指正&#xff5e; 1、AbstractQueuedSynchronizer概述 AbstractQueuedSy…

文献阅读 | Nature Methods:使用 STAMP 对空间转录组进行可解释的空间感知降维

文献介绍 文献题目&#xff1a; 使用 STAMP 对空间转录组进行可解释的空间感知降维 研究团队&#xff1a; 陈金妙&#xff08;新加坡科学技术研究局&#xff09; 发表时间&#xff1a; 2024-10-15 发表期刊&#xff1a; Nature Methods 影响因子&#xff1a; 36.1&#xff0…

Redis系列之底层数据结构ZipList

Redis系列之底层数据结构ZipList 实验环境 Redis 6.0 什么是Ziplist&#xff1f; Ziplist&#xff0c;压缩列表&#xff0c;这种数据结构会根据存入数据的类型和大小&#xff0c;分配大小不同的空间&#xff0c;所以是为了节省内存而采用的。因为这种数据结构是一种完整连续…

界面控件DevExpress WPF中文教程:TreeList视图及创建分配视图

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

数据结构中数据有序性/ 单调性 ——二分查找

以下记录的都是闭区间写法 例题&#xff1a;34. 在排序数组中查找元素的第一个和最后一个位置 1.关系转换 寻找目标值有四种情况&#xff1a;≥、>、≤、< 比如目标值x&#xff0c; 可以转化为 ≥x、≥ x1、≤x、≤ x1 比如数组大小为6&#xff0c;目标值为…

探索Python的HTTP利器:Requests库的神秘面纱

文章目录 **探索Python的HTTP利器&#xff1a;Requests库的神秘面纱**一、背景&#xff1a;为何选择Requests库&#xff1f;二、Requests库是什么&#xff1f;三、如何安装Requests库&#xff1f;四、Requests库的五个简单函数使用方法1. GET请求2. POST请求3. PUT请求4. DELET…

《Linux从小白到高手》综合应用篇:深入详解Linux swap及其调整优化

1. 引言&#xff1a; Swap是存储设备上的一块空间&#xff08;分区&#xff09;&#xff0c;操作系统可以在这里暂存一些内存里放不下的东西。这从某种程度上相当于增加了服务器的可用内存。虽然从swap读写比内存慢&#xff0c;但总比没有好&#xff0c;算是内存不足时一种比较…

SpringMVC学习笔记(一)

一、SpringMVC的基本概念 &#xff08;一&#xff09;三层架构和MVC 1、三层架构概述 我们的开发架构一般都是基于两种形式&#xff0c;一种是 C/S 架构&#xff0c;也就是客户端/服务器&#xff0c;另一种是 B/S 架构&#xff0c;也就是浏览器服务器。在 JavaEE 开发中&…

小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程

一、概述 【软件资源文件下载在文章最后】 小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程 点餐软件以其实用的功能和简便的操作&#xff0c;为小型餐饮店提供了高效的点餐管理解决方案&#xff0c;提高了工作效率和服务质量 ‌点餐管理‌&#xff1a;支持电…