06 | Swoole 源码分析之 Coroutine 协程模块

首发原文链接:Swoole 源码分析之 Coroutine 协程模块
大家好,我是码农先森。

引言

协程又称轻量级线程,但与线程不同的是;协程是用户级线程,不需要操作系统参与。由用户显式控制,可以在需要的时候挂起、或恢复执行。

通过协程程序可以在执行的过程中保存当前的状态,并在恢复后从该状态处继续执行,整体上来说创建、销毁、切换的成本低。

但在 Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。

协程的出现为 Swoole 程序提升并发效率、及系统的处理能力,注入了强劲的动力;可以说是 Swoole 作为高性能通信框架的的核心模块。

源码拆解

这次我们以下面这段代码,来作为本次拆解源码的切入点。

// 协程容器
Swoole\Coroutine\run(function () {
    // Socket 协程客户端
    $socket = new Swoole\Coroutine\Socket(AF_INET, SOCK_STREAM, 0);
    // 建立连接,在建立连接的过程中会发生协程切换
    $retval = $socket->connect('127.0.0.1', 9601);
    if ($retval) {
        // 发送数据,在发送数据的过程中会发生协程切换
        $n = $socket->send('hello');
        var_dump($n);

        // 解释数据,在接收数据的过程中会发生协程切换
        $data = $socket->recv();
        var_dump($data);

        // 关闭连接
        $socket->close();
    }
});

这段代码主要是使用 Socket 的协程客户端与本地的 9601 端口建立连接,并且发送、接收数据。在分析源码之前,我对这次的源码做了一个图解梳理,把整个调用链路上的函数串联了起来。我们可以先对整体有个大致的了解,便于后面分析源代码。

Socket 协程客户端

Socket 协程客户端是专门用于 Swoole 在协程环境中使用的,可以实现在 IO 调用时切换协程,让出 CPU 的使用权。例如:在连接建立、发送数据、接收数据 等阶段会进行协程的切换。

这个函数主要是发起 Socket 连接的建立,并且在 wait_event 这个函数内部实现了协程的切换。

// swoole-src/src/coroutine/socket.cc:595
bool Socket::connect(const struct sockaddr *addr, socklen_t addrlen) {
    if (sw_unlikely(!is_available(SW_EVENT_RDWR))) {
        return false;
    }
    int retval;
    do {
        // 发起连接建立
        retval = ::connect(sock_fd, addr, addrlen);
    } while (retval < 0 && errno == EINTR);
    if (retval < 0) {
        if (errno != EINPROGRESS) {
            set_err(errno);
            return false;
        } else {
            TimerController timer(&write_timer, connect_timeout, this, timer_callback);
            // wait_event 这个函数内部实现了协程的切换
            if (!timer.start() || !wait_event(SW_EVENT_WRITE)) {
                if (is_closed()) {
                    set_err(ECONNABORTED);
                }
                return false;
            } else {
                if (socket->get_option(SOL_SOCKET, SO_ERROR, &errCode) < 0 || errCode != 0) {
                    set_err(errCode);
                    return false;
                }
            }
        }
    }
    connected = true;
    set_err(0);
    return true;
}

再看看 wait_event 函数的内部实现,先是获取到当前的协程,然后根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中,最后将当前的协程切换出去,让出其 CPU 的控制权。

// swoole-src/src/coroutine/socket.cc:147
bool Socket::wait_event(const EventType event, const void **__buf, size_t __n) {
    EventType added_event = event;
    // 获取到当前的协程
    Coroutine *co = Coroutine::get_current_safe();
    if (!co) {
        return false;
    }
    if (sw_unlikely(socket->close_wait)) {
        set_err(SW_ERROR_CO_SOCKET_CLOSE_WAIT);
        return false;
    }

    // clear the last errCode
    set_err(0);
#ifdef SW_USE_OPENSSL
    // 根据事件的类型调用函数 add_event 将事件添加到事件管理的结构体中
    if (sw_unlikely(socket->ssl && ((event == SW_EVENT_READ && socket->ssl_want_write) ||
                                    (event == SW_EVENT_WRITE && socket->ssl_want_read)))) {
        if (sw_likely(socket->ssl_want_write && add_event(SW_EVENT_WRITE))) {
            want_event = SW_EVENT_WRITE;
        } else if (socket->ssl_want_read && add_event(SW_EVENT_READ)) {
            want_event = SW_EVENT_READ;
        } else {
            return false;
        }
        added_event = want_event;
    } else
#endif
        if (sw_unlikely(!add_event(event))) {
        return false;
    }
    swoole_trace_log(SW_TRACE_SOCKET,
                     "socket#%d blongs to cid#%ld is waiting for %s event",
                     sock_fd,
                     co->get_cid(),
                     get_wait_event_name(this, event));

    Coroutine::CancelFunc cancel_fn = [this, event](Coroutine *co) { return cancel(event); };

    // 将当前的协程切换出去,让出其 CPU 的控制权
    if (sw_likely(event == SW_EVENT_READ)) {
        read_co = co;
        read_co->yield(&cancel_fn);
        read_co = nullptr;
    } else if (event == SW_EVENT_WRITE) {
        if (sw_unlikely(!zero_copy && __n > 0 && *__buf != get_write_buffer()->str)) {
            write_buffer->clear();
            if (write_buffer->append((const char *) *__buf, __n) != SW_OK) {
                set_err(ENOMEM);
                goto _failed;
            }
            *__buf = write_buffer->str;
        }
        write_co = co;
        write_co->yield(&cancel_fn);
        write_co = nullptr;
    } else {
        assert(0);
        return false;
    }
_failed:
#ifdef SW_USE_OPENSSL
    // maybe read_co and write_co are all waiting for the same event when we use SSL
    if (sw_likely(want_event == SW_EVENT_NULL || !has_bound()))
#endif
    {
        Reactor *reactor = SwooleTG.reactor;
        if (sw_likely(added_event == SW_EVENT_READ)) {
            reactor->remove_read_event(socket);
        } else {
            reactor->remove_write_event(socket);
        }
    }
#ifdef SW_USE_OPENSSL
    want_event = SW_EVENT_NULL;
#endif
    swoole_trace_log(SW_TRACE_SOCKET,
                     "socket#%d blongs to cid#%ld trigger %s event",
                     sock_fd,
                     co->get_cid(),
                     get_trigger_event_name(this, added_event));
    return !is_closed() && !errCode;
}

同理 send()recv() 函数,也和 connect() 函数是一样的实现方式。

// swoole-src/src/coroutine/socket.cc:847
ssize_t Socket::send(const void *__buf, size_t __n) {
    if (sw_unlikely(!is_available(SW_EVENT_WRITE))) {
        return -1;
    }
    ssize_t retval;
    TimerController timer(&write_timer, write_timeout, this, timer_callback);
    do {
        // 发送数据
        retval = socket->send(__buf, __n, 0);
    } while (retval < 0 && socket->catch_write_error(errno) == SW_WAIT && timer.start() &&
             wait_event(SW_EVENT_WRITE, &__buf, __n));
    check_return_value(retval);
    return retval;
}

// swoole-src/src/coroutine/socket.cc:874
ssize_t Socket::recv(void *__buf, size_t __n) {
    if (sw_unlikely(!is_available(SW_EVENT_READ))) {
        return -1;
    }
    ssize_t retval;
    TimerController timer(&read_timer, read_timeout, this, timer_callback);
    do {
        // 接收数据
        retval = socket->recv(__buf, __n, 0);
    } while (retval < 0 && socket->catch_read_error(errno) == SW_WAIT && timer.start() && wait_event(SW_EVENT_READ));
    check_return_value(retval);
    return retval;
}

也是调用 wait_event() 函数来实现当前的协程切换,唯一的区别就是事件的类型不同,一个是读事件,一个是写事件。

Run 协程容器

在 Swoole 中要想使用协程,那么必须要在协程的环境中使用协程的客户端,或者支持 Hook 的原生 PHP 函数。才能享受到 Swoole 中协程带来的高性能,不然和普通的 PHP 执行程序没有什么区别,变成了同步阻塞。

在源码中协程容器主要是实现了事件循环的初始化、协程上下文的创建管理、事件循环的 IO 事件监听,接下来我们会主要分析关于事件管理的部分内容。

// swoole-src/src/coroutine/base.cc:210
namespace coroutine {
    bool run(const CoroutineFunc &fn, void *arg) {
        // 事件循环的初始化
        if (swoole_event_init(SW_EVENTLOOP_WAIT_EXIT) < 0) {
            return false;
        }
        // 协程上下文的创建管理
        Coroutine::activate();
        long cid = Coroutine::create(fn, arg);
        // 事件循环的 IO 事件监听
        swoole_event_wait();
        Coroutine::deactivate();
        return cid > 0;
    }
}

Event 事件初始化

Event 事件初始化主要是定义一些事件的回调函数,便于在事件被触发时恢复对应的协程进行后续的逻辑处理,例如:读事件回调函数 readable_event_callback、写事件回调函数 writable_event_callback 等。

// swoole-src/src/wrapper/event.cc:37
int swoole_event_init(int flags) {
    if (!SwooleG.init) {
        std::unique_lock<std::mutex> lock(init_lock);
        swoole_init();
    }
    
    // 创建一个 Reactor 实例对象
    Reactor *reactor = new Reactor(SW_REACTOR_MAXEVENTS);
    if (!reactor->ready()) {
        return SW_ERR;
    }

    if (flags & SW_EVENTLOOP_WAIT_EXIT) {
        reactor->wait_exit = 1;
    }
    
    // Socket 事件初始化
    coroutine::Socket::init_reactor(reactor);
    coroutine::System::init_reactor(reactor);
    network::Client::init_reactor(reactor);

    SwooleTG.reactor = reactor;

    return SW_OK;
}
// swoole-src/include/swoole_coroutine_sokcet.h:157
static inline void init_reactor(Reactor *reactor) {
    // 定义对应事件的回调函数
    reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_READ, readable_event_callback);
    reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_WRITE, writable_event_callback);
    reactor->set_handler(SW_FD_CO_SOCKET | SW_EVENT_ERROR, error_event_callback);
}
// swoole-src/src/coroutine/socket.c:48
int Socket::readable_event_callback(Reactor *reactor, Event *event) {
    Socket *socket = (Socket *) event->socket->object;
    socket->set_err(0);
#ifdef SW_USE_OPENSSL
    if (sw_unlikely(socket->want_event != SW_EVENT_NULL)) {
        if (socket->want_event == SW_EVENT_READ) {
            // 恢复对应的协程
            socket->write_co->resume();
        }
    } else
#endif
    {
        if (socket->recv_barrier && (*socket->recv_barrier)() && !event->socket->event_hup) {
            return SW_OK;
        }
        // 恢复对应的协程
        socket->read_co->resume();
    }

    return SW_OK;
}

Event 事件监听

Event 事件监听主要是针对被加入到事件循环中的 Socket 进行 IO 事件的监听,如果有读或写 IO 事件的触发,则回调到对应的处理函数上进行执行。

// swoole-src/src/warpper/event.cc:84
int swoole_event_wait() {
    Reactor *reactor = SwooleTG.reactor;
    int retval = 0;
    if (!reactor->wait_exit or !reactor->if_exit()) {
        // 事件循环等待调用
        retval = reactor->wait(nullptr);
    }
    swoole_event_free();
    return retval;
}
// swoole-src/src/reactor/epoll.cc:153
int ReactorEpoll::wait(struct timeval *timeo) {
    Event event;
    ReactorHandler handler;
    int i, n, ret;

    int reactor_id = reactor_->id;
    int max_event_num = reactor_->max_event_num;

    if (reactor_->timeout_msec == 0) {
        if (timeo == nullptr) {
            reactor_->timeout_msec = -1;
        } else {
            reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
        }
    }

    reactor_->before_wait();

    while (reactor_->running) {
        if (reactor_->onBegin != nullptr) {
            reactor_->onBegin(reactor_);
        }
        // 监听 IO 事件
        n = epoll_wait(epfd_, events_, max_event_num, reactor_->get_timeout_msec());
        if (n < 0) {
            if (!reactor_->catch_error()) {
                swoole_sys_warning("[Reactor#%d] epoll_wait failed", reactor_id);
                return SW_ERR;
            } else {
                goto _continue;
            }
        } else if (n == 0) {
            reactor_->execute_end_callbacks(true);
            SW_REACTOR_CONTINUE;
        }
        for (i = 0; i < n; i++) {
            event.reactor_id = reactor_id;
            event.socket = (Socket *) events_[i].data.ptr;
            event.type = event.socket->fd_type;
            event.fd = event.socket->fd;

            if (events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
                event.socket->event_hup = 1;
            }
            // read 读事件,这里的 handler 对应 readable_event_callback
            if ((events_[i].events & EPOLLIN) && !event.socket->removed) {
                handler = reactor_->get_handler(SW_EVENT_READ, event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLIN handle failed. fd=%d", event.fd);
                }
            }
            // write 写事件,这里的 handler 对应 writable_event_callback
            if ((events_[i].events & EPOLLOUT) && !event.socket->removed) {
                handler = reactor_->get_handler(SW_EVENT_WRITE, event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLOUT handle failed. fd=%d", event.fd);
                }
            }
            // error 错误处理,这里的 handler 对应 error_event_callback
            if ((events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed) {
                // ignore ERR and HUP, because event is already processed at IN and OUT handler.
                if ((events_[i].events & EPOLLIN) || (events_[i].events & EPOLLOUT)) {
                    continue;
                }
                handler = reactor_->get_error_handler(event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLERR handle failed. fd=%d", event.fd);
                }
            }
            if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) {
                reactor_->_del(event.socket);
            }
        }

    _continue:
        reactor_->execute_end_callbacks(false);
        SW_REACTOR_CONTINUE;
    }
    return 0;
}

总结

  • 协程又称轻量级线程,协程是用户级线程;不需要操作系统参与,创建切换成本低。
  • Swoole 中的协程是无法利用多核 CPU 的,如果想利用多核 CPU 则需要依赖 Swoole 的多进程模型。
  • Swoole 中协程的是利用的 Event 事件循环进行调度的,将遇到 IO 操作的 Socket 统一加入到事件循环中。
  • 本次的源码分析旨在了解整个协程在 Swoole 中的运行逻辑,打开我们的思路,便于我们更好的体会到协程所带来的高性能价值。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/510613.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

回顾快速排序

快速排序 快速排序的核心&#xff1a; 找到一个key 通常左边的数比key小&#xff0c;右边的数比key大。 找key通常有三种方法&#xff1a; 1. 挖坑法&#xff1a; 代码实现&#xff1a; // int _pivot(int* a, int left, int right) {int begin left, end right;int in…

动态图学习新突破!最新SOTA实现性能全面升级,效率与精度兼得

现实世界中的许多图数据是动态变化的&#xff0c;比如社交网络、交通流量等。而传统的图学习方法通常处理的是静态图&#xff0c;这就导致它缺乏处理动态变化的能力&#xff0c;在适应性方面存在局限性。 相较之下&#xff0c;动态图学习能够捕捉到图数据的动态变化&#xff0…

MuJoCo 入门教程(一)

系列文章目录 前言 一、简介 MuJoCo 是多关节接触动力学&#xff08;Multi-Joint dynamics with Contact&#xff09;的缩写。它是一个通用物理引擎&#xff0c;旨在促进机器人、生物力学、图形和动画、机器学习以及其他需要快速、准确地仿真铰接结构与环境交互的领域的研究和开…

ssm016基于 Java Web 的校园驿站管理系统+jsp

校园驿站管理系统的设计与实现 摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对校园快递信息管理混乱&#xff0c;出…

阿里云优惠券如何领取使用?

阿里云是阿里巴巴旗下云计算及人工智能科技公司&#xff0c;提供云服务器、云数据库、云存储等云计算服务和云解决方案。为了吸引更多的用户&#xff0c;阿里云经常推出各种优惠活动&#xff0c;其中就包括阿里云优惠券。本文将为大家详细介绍阿里云优惠券领取方法及使用教程&a…

Nginx 基础

文章目录 Nginx概念安装下载上传安装包执行准备条件指定安装位置编译和安装启动服务创建启动脚本 linux文件目录nginx运行原理nginx配置域名概念和原理域名配置 Nginx 概念 Nginx 是一个高性能的HTTP和反向代理web服务器&#xff0c;同时也提供了IMAP/POP3/SMTP服务。Nginx是…

211基于matlab的多类结构动力学

基于matlab的多类结构动力学&#xff0c;凸轮机构、双凸轮、弦振动模拟、阻尼振动 、四连杆机构 、套杆运动 、三根弹簧作用的振子。程序已调通&#xff0c;可直接运行。 211 matlab 结构动力学 根弹簧作用的振子 - 小红书 (xiaohongshu.com)

javaweb学习(day10-服务器渲染技术)

一、基本介绍 1.前言 目前主流的技术是 前后端分离 (比如: Spring Boot Vue/React)JSP 技术使用在逐渐减少&#xff0c;但使用少和没有使用是两个意思&#xff0c;一些老项目和中小公司还在使用 JSP&#xff0c;工作期间&#xff0c;你很有可能遇到 JSPJSP 使用在减少(但是现…

Python深度学习034:cuda的环境如何配置

文章目录 1.安装nvidia cuda驱动CMD中看一下cuda版本:下载并安装cuda驱动2.创建虚拟环境并安装pytorch的torch_cuda3.测试附录1.安装nvidia cuda驱动 CMD中看一下cuda版本: 注意: 红框的cuda版本,是你的显卡能装的最高的cuda版本,所以可以选择低于它的版本。比如我的是11…

人工智能|深度学习——基于Xception算法模型实现一个图像分类识别系统

一、Xception简介 在计算机视觉领域&#xff0c;图像识别是一个非常重要的任务&#xff0c;其应用涵盖了人脸识别、物体检测、场景理解等众多领域。随着深度学习技术的发展&#xff0c;深度卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;简称CNN&#xff…

阿赵UE学习笔记——24、动画播放控制

阿赵UE学习笔记目录   大家好&#xff0c;我是阿赵。   继续学习虚幻引擎的使用。关于UE的动画系统&#xff0c;之前学习了很多&#xff0c;包括动画合成或者动画蒙太奇等&#xff0c;实际上最后得到的都是一个动画片段。那么这些动画片段&#xff0c;是需要怎样播放控制呢…

乐观锁解决超卖问题

3.6 乐观锁解决超卖问题 修改代码方案一、 VoucherOrderServiceImpl 在扣减库存时&#xff0c;改为&#xff1a; boolean success seckillVoucherService.update().setSql("stock stock -1") //set stock stock -1.eq("voucher_id", voucherId).eq(&q…

STM32-02基于HAL库(CubeMX+MDK+Proteus)GPIO输出案例(LED流水灯)

文章目录 一、功能需求分析二、Proteus绘制电路原理图三、STMCubeMX 配置引脚及模式&#xff0c;生成代码四、MDK打开生成项目&#xff0c;编写HAL库的GPIO输出代码五、运行仿真程序&#xff0c;调试代码 一、功能需求分析 在完成开发环境搭建之后&#xff0c;开始使用STM32GP…

TCP和UDP区别和使用场景

TCP 和 UDP 是计算机⽹络中两种常⽤的传输层协议&#xff0c;⽤于实现可靠传输和⽆连接传输。 TCP TCP&#xff08;Transmission Control Protocol&#xff09;是⼀种⾯向连接的、可靠的传输协议。它通过三次握⼿四次挥⼿进⾏连接和断开链接&#xff0c;保证数据的可靠性、…

H5类似Word文档输入框小记

最近一个需求在客户端编辑输入超长文本带下划线。 最开始的input、textarea无法像span一样换行pass了。柳暗无天日之际&#xff0c;被投喂了一个contenteditable 。试了一下&#xff0c;嗯... 乌龟看绿豆--对眼了。 div 加上 contenteditable 后便继承了inputEvent 开启输入模…

【“状态机” 解析UART不定长度的协议帧】

【“状态机” 解析UART不定长度的协议帧】 1. 数据帧格式2. 状态机原理3. 代码实现 通信设计中考虑协议的灵活性&#xff0c;经常把协议设计成“不定长度”。如果一个系统接收上述“不定长度”的协议帧&#xff0c;将会有一个挑战–如何高效接收与解析。一个实例如下图&#xf…

流量卡VS随身WIFI?手把手教你怎么选!流量卡和随身WiFi哪个好?流量卡和随身WiFi的区别!流量卡和随身WiFi哪个更划算?流量卡和随身WiFi怎么选?

出门在外&#xff0c;网络、流量已经成为了我们必不可少需要考虑的问题&#xff01;在选择如何获取大流量时&#xff0c;很多人都选择困难&#xff1a;是选择一张流量卡&#xff0c;还是一个随身WIFI&#xff1f; 今天&#xff0c;将从功能与形态、信号、适用场景、限制条件等多…

初阶数据结构—算法的时间复杂度和空间复杂度

第一章&#xff1a;数据结构前言&#xff08;Lesson 1&#xff09; 1. 什么是数据结构&#xff1f; 数据结构 (Data Structure) 是计算机存储、组织数据的方式&#xff0c;指相互之间存在一种或多种特定关系的 数据元素的集合。 2. 什么是算法&#xff1f; 算法(Algorithm)…

Linux学习笔记————C 语言版 LED 灯实验

这里写目录标题 一、实验程序编写二、 汇编部分实验程序编写三、C 语言部分实验程序编写四、编译下载验证 汇编 LED 灯实验中&#xff0c;我们讲解了如何使用汇编来编写 LED 灯驱动&#xff0c;实际工作中是很少用到汇编去写嵌入式驱动的&#xff0c;毕竟汇编太难&#xff0c;而…

X射线源电流电压的实际影响

在进行实际实验的时候&#xff0c;感觉X射线电流电压好像对于成像质量的影响差不多&#xff0c;分不清楚了&#xff0c;这里记录一下&#xff0c;还没探索到原因。 80kv 500uA 功率&#xff1a;40W 90kv 300uA 功率&#xff1a;27W 90kev 600uA 110v 300uA