07 | Swoole 源码分析之 Channel 通道模块

原文首发链接:Swoole 源码分析之 Channel 通道模块
大家好,我是码农先森。

引言

通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗。

底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗 channel 基于引用计数实现,是零拷贝的。

源码拆解

Channel 通道需要在协程环境中使用,我们先看下面这段代码,使用 new Channel(1) 创建一个 channel 对象,然后在第一个协程中向通道中推送数据,在第二个协程获取到通道内的数据进行消费。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

run(function(){
    // 创建 channel 通道对象
    $channel = new Channel(1);
    Coroutine::create(function () use ($channel) {
        for($i = 0; $i < 10; $i++) {
            Coroutine::sleep(1.0);
            // 向通道内推送数据
            $channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
            echo "{$i}\n";
        }
    });
    Coroutine::create(function () use ($channel) {
        while(1) {
            // 从通道中获取数据
            $data = $channel->pop(2.0);
            if ($data) {
                var_dump($data);
            } else {
                assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
                break;
            }
        }
    });
});

在分析源代码之前,我们可以提前看一下源码整体的调用逻辑图,以便我们有个大致的印象。

这段代码主要是在 Swoole 的协程环境中创建 Channel 对象并初始化其容量的逻辑。

// swoole-src/ext-src/swoole-channel.cc:132
static PHP_METHOD(swoole_channel_coro, __construct) {
    zend_long capacity = 1;
	
	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_LONG(capacity)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (capacity <= 0) {
        capacity = 1;
    }

	// 当前对象对应的 ChannelObject 结构体指针
    ChannelObject *chan_t = php_swoole_channel_coro_fetch_object(Z_OBJ_P(ZEND_THIS));
    // 为该通道对象分配新的 Channel 实例,并设置其容量为传入的值。
    chan_t->chan = new Channel(capacity);
    zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("capacity"), capacity);
}

这段代码主要是在 Swoole 的协程环境中向通道中推送数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:149
static PHP_METHOD(swoole_channel_coro, push) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    zval *zdata;
    double timeout = -1;

	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 2)
    Z_PARAM_ZVAL(zdata)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    Z_TRY_ADDREF_P(zdata);
    zdata = sw_zval_dup(zdata);
    // 向通道中推入数据
    if (chan->push(zdata, timeout)) {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
        RETURN_TRUE;
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        Z_TRY_DELREF_P(zdata);
        efree(zdata);
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:105
bool Channel::push(void *data, double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭
    if (closed) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return false;
    }
    // 如果通道已满或生产者队列不为空,则设置超时消息,并根据传入的超时值添加定时器,等待生产者。
    if (is_full() || !producer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = PRODUCER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起生产者协程
        yield(PRODUCER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
	// 将数据压入数据队列。
    data_queue.push(data);
    swoole_trace_log(SW_TRACE_CHANNEL, "push data to channel, count=%ld", length());
    
    // 如果消费者队列不为空,则唤醒消费者协程。
    if (!consumer_queue.empty()) {
        Coroutine *co = pop_coroutine(CONSUMER);
        // 恢复消费者协程
        co->resume();
    }
    return true;
}

这段代码主要是在 Swoole 的协程环境中从通道中取出数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:175
static PHP_METHOD(swoole_channel_coro, pop) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    // 设置超时变量为-1
    double timeout = -1;
	
	// 解析一个超时参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

	// 从通道中取出数据,并返回一个 zval 指针
    zval *zdata = (zval *) chan->pop(timeout);
    // 如果返回的 zval 指针不为空
    if (zdata) {
    	// 将其返回给 PHP 脚本,并释放内存
        RETVAL_ZVAL(zdata, 0, 0);
        efree(zdata);
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:55
void *Channel::pop(double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭且为空
    if (closed && is_empty()) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return nullptr;
    }
    // 如果通道为空或者消费者队列不为空
    if (is_empty() || !consumer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = CONSUMER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起消费者协程
        yield(CONSUMER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
    // 从数据队列中弹出数据,并返回该数据。
    void *data = data_queue.front();
    data_queue.pop();

    // 如果生产者队列不为空,则唤醒生产者协程
    if (!producer_queue.empty()) {
        Coroutine *co = pop_coroutine(PRODUCER);
        // 恢复到生产者协程
        co->resume();
    }
    return data;
}

这段代码一是针对超时回调处理的处理逻辑,并恢复相关的协程操作。二是实现了协程的挂起操作,并根据不同的类型将当前协程放入不同的队列中,以便后续根据需要恢复执行。

// swoole-src/coroutine/channel.cc:22
void Channel::timer_callback(Timer *timer, TimerNode *tnode) {
    TimeoutMessage *msg = (TimeoutMessage *) tnode->data;
    msg->error = true;
    msg->timer = nullptr;
    if (msg->type == CONSUMER) {
    	// 从消费者队列中移除该协程
        msg->chan->consumer_remove(msg->co);
    } else {
    	// 从生产者队列中移除该协程
        msg->chan->producer_remove(msg->co);
    }
    // 恢复协程
    msg->co->resume();
}

// swoole-src/coroutine/channel.cc:34
void Channel::yield(enum Opcode type) {
	// 获取当前协程
    Coroutine *co = Coroutine::get_current_safe();
    if (type == PRODUCER) {
    	// 将当前协程放入到生产者队列
        producer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "producer cid=%ld", co->get_cid());
    } else {
    	// 将当前协程放入到消费者队列
        consumer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "consumer cid=%ld", co->get_cid());
    }
    
    // 挂起被取消,则调用该函数
    Coroutine::CancelFunc cancel_fn = [this, type](Coroutine *co) {
        if (type == CONSUMER) {
            consumer_remove(co);
        } else {
            producer_remove(co);
        }
        co->resume();
        return true;
    };

    // 挂起当前协程
    co->yield(&cancel_fn);
}

总结

  1. Channel 通道需要在协程的环境中进行使用,通道是纯内存操作,没有 IO 消耗,非常高效。
  2. 底层使用 Channel::yield 函数实现了协程的自动切换和调度,如果通道处理超时则会自动调用 Channel::timer_callback 函数。
  3. Channel 通道是跨协程直接通信的一大利器,在实际的场景中使用起来十分的便利、高效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/522622.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

中科大发布Agent-FLAN,微调提升Agent能力

随着大语言模型&#xff08;LLMs&#xff09;在各种自然语言处理任务中取得巨大成功&#xff0c;将这些模型作为智能代理&#xff08;agents&#xff09;使用时&#xff0c;它们与基于API的模型相比仍有不小的差距。如何将代理能力有效地整合到通用的LLMs中&#xff0c;成为了一…

单片机为什么还在用C语言编程?

单片机产品的成本是非常敏感的。因此对于单片机开发来说&#xff0c;最重要的是在极其有限的ROM和RAM中实现最多产品的功能。或者反过来说&#xff0c;实现相同的产品功能&#xff0c;所需要的ROM和RAM越小越好&#xff0c;在开始前我有一些资料&#xff0c;是我根据网友给的问…

C++ 【原型模式】

简单介绍 原型模式是一种创建型设计模式 | 它使你能够复制已有对象&#xff0c;客户端不需要知道要复制的对象是哪个类的实例&#xff0c;只需通过原型工厂获取该对象的副本。 以后需要更改具体的类或添加新的原型类&#xff0c;客户端代码无需改变&#xff0c;只需修改原型工…

Linux(CentOS7)部署 y-api 接口管理平台

目录 前言 前置环境 mongodb node 安装 y-api 部署页面 启动 y-api 基本使用教程 前言 前后端分离时代&#xff0c;前后端通过接口文档来协作开发项目。一般开发过程中&#xff0c;由后端先编写接口文档&#xff0c;然后交付给前端&#xff0c;这时候前后端都根据这个…

C# 委托的基础应用

一、Action 和 Func 的使用。 二、自定义委托&#xff1a; 完整的使用代码示例&#xff1a; 三、委托的一般使用 模板方法&#xff1a; 回调方法&#xff0c;在模板方法的基础上进行添加。

刷题之Leetcode209题(超级详细)

209.长度最小的子数组 力扣题目链接(opens new window)https://leetcode.cn/problems/minimum-size-subarray-sum/ 给定一个含有 n 个正整数的数组和一个正整数 s &#xff0c;找出该数组中满足其和 ≥ s 的长度最小的 连续 子数组&#xff0c;并返回其长度。如果不存在符合条…

EPSON高精度导航陀螺仪XV7001BB

随着道路交通的不断发展&#xff0c;以及城市道路的不断更新&#xff0c;以前走过的路早已物是人非&#xff0c;越来越多的驾驶者不得不借助导航系统才能到达目的地&#xff0c;导航成为了出行必不可少的功能。目前的导航都是基于GPS信号定位&#xff0c;再结合导航内部的地图软…

基于javaJSPssm实现的交通档案管理系统

开发语言&#xff1a;Java 框架&#xff1a;ssm 技术&#xff1a;JSP JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclip…

ThreadLocal核心源码阅读

1. 概述 ThreadLocal为每个使用该变量的线程提供独立的变量副本&#xff0c;因此每一个线程都可以独立地改变自己的副本&#xff0c;而不会影响其他线程。 入门例子&#xff1a; public class ThreadLocalStudy {static ThreadLocal<String> stringThreadLocal new T…

C语言:顺序表专题

目录 一、数据结构之顺序表/链表1.数据结构相关概念1.1什么是数据结构1.2为什么需要数据结构 二、顺序表1.顺序表的概念及结构2.顺序表分类3.动态顺序表的实现 一、数据结构之顺序表/链表 1.数据结构相关概念 1.1什么是数据结构 数据结构是由“数据”和“结构”两词组合而来…

医学图像处理 利用pytorch实现的可用于反传的Radon变换和逆变换

医学图像处理 利用pytorch实现的可用于反传的Radon变换和逆变换 前言代码实现思路实验结果 前言 Computed Tomography&#xff08;CT&#xff0c;计算机断层成像&#xff09;技术作为如今医学中重要的辅助诊断手段&#xff0c;也是医学图像研究的重要主题。如今&#xff0c;随…

Python-VBA函数基础知识-001

一、函数的定义&#xff1a; 函数(Function)是一段可重复使用的代码块&#xff0c;用于执行特定的任务或计算&#xff0c;并可以接受输入参数和返回输出结果。函数可以将复杂的问题分解为更小的子问题&#xff0c;提高代码的可读性和可维护性。 二、函数的组成&#xff1a; 在…

基于单片机电子指南针系统设计

**单片机设计介绍&#xff0c;基于单片机电子指南针系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机电子指南针系统设计概要主要涵盖了硬件设计、软件设计、磁场传感器选择、数据处理和显示等方面。以下是对该…

记某客户的一次无缝数据迁移

背景 客户需要将 Elasticsearch 集群无缝迁移到移动云&#xff0c;迁移过程要保证业务的最小停机时间。 实现方式 通过采用成熟的 INFINI 网关来进行数据的双写&#xff0c;在集群的切换恢复过程中来记录数据变更&#xff0c;待全量数据恢复之后再追平后面增量数据&#xff…

C++从入门到精通——类的作用域及类的实例化

类的作用域及类的实例化 前言一、类的作用域二、类的实例化引例类是对对象进行描述的示例 一个类可以实例化出多个对象示例 示例 前言 类的作用域是指类中定义的变量和方法的可见性和可访问性范围。在类的内部&#xff0c;所有成员&#xff08;包括属性和方法&#xff09;都具…

快速理解JS中的原型和原型链

快速理解JS中的原型和原型链 在我们学习JS的过程中&#xff0c;我们总会接触到一些词&#xff1a;“原型”&#xff0c;“原型链”。那么今天我就来带大家来学习学习原型和原型链的知识吧&#xff01; 在开始之前&#xff0c;我们明确一下我们接下来想要学习的目标&#xff1a…

【机器学习】K-means聚类算法:原理、应用与优化

一、引言 1、简述聚类分析的重要性及其在机器学习中的应用 聚类分析&#xff0c;作为机器学习领域中的一种无监督学习方法&#xff0c;在数据探索与知识发现过程中扮演着举足轻重的角色。它能够在没有先验知识或标签信息的情况下&#xff0c;通过挖掘数据中的内在结构和规律&a…

使用Springfox Swagger实现API自动生成单元测试

目录 第一步&#xff1a;在pom.xml中添加依赖 第二步&#xff1a;加入以下代码&#xff0c;并作出适当修改 第三步&#xff1a;在application.yaml中添加 第四步&#xff1a;添加注解 第五步&#xff1a;运行成功之后&#xff0c;访问相应网址 另外&#xff1a;还可以导出…

ES学习日记(七)-------Kibana安装和简易使用

前言 首先明确一点&#xff0c;Kibana是一个软件&#xff0c;不是插件。 Kibana 是一款开源的数据分析和可视化平台&#xff0c;它是 Elastic stack 成员之一&#xff0c;设计用于和Elasticsearch 协作。您可以使用 Kibana 对 Elasticsearch 索引中的数据进行搜索&#xff0c;…

python文件打包找不到文件路径

引用&#xff1a;【将Python代码打包成exe可执行文件】 https://www.bilibili.com/video/BV1P24y1o7FY/?p4&share_sourcecopy_web&vd_sourced5811f31a0635dfc69a182c7bf1adb8b 在代码中&#xff0c;我们想读取文件a&#xff0c;一般使用如下方法。 import osdir os…