WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据

一、前言:

RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序类型 和其他因素,动态决定数据包的发送顺序。

二、类关系图:

2.1、类图:

在这里插入图片描述

关键部分加了注释,便于理解:

  1. 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
  2. 多级优先级调度: 不同流间按优先级调度(基于 StreamPrioKey),单个流内的包按严格的队列顺序管理。
  3. 静态与动态属性结合: 例如,属性包括每个包的优先级(priority)、入队时间(enqueue_time)、重传标志、带宽开销等。
  4. 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。

2.2、重要成员关系:

在这里插入图片描述

  1. stream_priorities当中存放StreamPrioKey和ssrc对应关系;
  2. streams当中存放ssrc和Stream的对应关系;
  3. 这样就建立了StreamPrioKey和Stream之间的关系;
  4. 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;

小结下:重要成员变量的功能

成员变量功能
streams_保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。
stream_priorities_将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。
enqueue_times_一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。
size_packets_size_分别记录队列中包的个数和总字节数,动态调整。
transport_overhead_per_packet_计算包传输的额外开销(如包头)。
time_last_updated_用于统计队列的更新时间,辅助计算入队和等待时间等。

三、重要函数:

3.1、Push

Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:

处理一个新包时:

  1. 确定该包所属的流,如果不存在该流,则创建一个 Stream 对象。
  2. 将包包装成为 QueuedPacket 并插入到流的优先级队列。
  3. 更新流的优先级键 StreamPrioKey,并在 stream_priorities_ 中重新排序。
  4. 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,
                                 Timestamp enqueue_time,
                                 uint64_t enqueue_order,
                                 std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(packet->packet_type().has_value());
  if (size_packets_ == 0) {
    // Single packet fast-path.
    single_packet_queue_.emplace(
        QueuedPacket(priority, enqueue_time, enqueue_order,
                     enqueue_times_.end(), std::move(packet)));
    UpdateQueueTime(enqueue_time);
    single_packet_queue_->SubtractPauseTime(pause_time_sum_);
    size_packets_ = 1;
    size_ += PacketSize(*single_packet_queue_);
  } else {
    MaybePromoteSinglePacketToNormalQueue();
    Push(QueuedPacket(priority, enqueue_time, enqueue_order,
                      enqueue_times_.insert(enqueue_time), std::move(packet)));
  }
}


void RoundRobinPacketQueue::Push(QueuedPacket packet) {
  auto stream_info_it = streams_.find(packet.Ssrc());
  if (stream_info_it == streams_.end()) {
    stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
    stream_info_it->second.priority_it = stream_priorities_.end();
    stream_info_it->second.ssrc = packet.Ssrc();
  }

  Stream* stream = &stream_info_it->second;

  if (stream->priority_it == stream_priorities_.end()) {
    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
    stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  } else if (packet.Priority() < stream->priority_it->first.priority) {
    stream_priorities_.erase(stream->priority_it);
    stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
  }

  if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
    packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));
  } else {
    UpdateQueueTime(packet.EnqueueTime());
    packet.SubtractPauseTime(pause_time_sum_);

    size_packets_ += 1;
    size_ += PacketSize(packet);
  }

  stream->packet_queue.push(packet);
}
  • 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
  • 查看stream的priority_it是否等于stream_priorities_的end()
    • 如果相等,则在stream_priorities_中插入一项;
    • 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
  • 更新队列总时长;
  • 入队时间减去暂停时间(一般不会有暂停);
  • 队列总包数+1;
  • 队列总字节大小 = 包的负载大小+Padding大小;
  • 插入到stream中的packet_queue中;

3.2、Pop

Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:

  1. 调用 GetHighestPriorityStream 获取当前优先级最高的流。
  2. 从该流的优先级队列(PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。
  3. 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
  if (single_packet_queue_.has_value()) {
    RTC_DCHECK(stream_priorities_.empty());
    std::unique_ptr<RtpPacketToSend> rtp_packet(
        single_packet_queue_->RtpPacket());
    single_packet_queue_.reset();
    queue_time_sum_ = TimeDelta::Zero();
    size_packets_ = 0;
    size_ = DataSize::Zero();
    return rtp_packet;
  }

  RTC_DCHECK(!Empty());
  Stream* stream = GetHighestPriorityStream();
  const QueuedPacket& queued_packet = stream->packet_queue.top();

  stream_priorities_.erase(stream->priority_it);

  // Calculate the total amount of time spent by this packet in the queue
  // while in a non-paused state. Note that the |pause_time_sum_ms_| was
  // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
  // by subtracting it now we effectively remove the time spent in in the
  // queue while in a paused state.
  TimeDelta time_in_non_paused_state =
      time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
  queue_time_sum_ -= time_in_non_paused_state;

  RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
  enqueue_times_.erase(queued_packet.EnqueueTimeIterator());

  // Update |bytes| of this stream. The general idea is that the stream that
  // has sent the least amount of bytes should have the highest priority.
  // The problem with that is if streams send with different rates, in which
  // case a "budget" will be built up for the stream sending at the lower
  // rate. To avoid building a too large budget we limit |bytes| to be within
  // kMaxLeading bytes of the stream that has sent the most amount of bytes.
  DataSize packet_size = PacketSize(queued_packet);
  stream->size =
      std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
  max_size_ = std::max(max_size_, stream->size);

  size_ -= packet_size;
  size_packets_ -= 1;
  RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());

  std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
  stream->packet_queue.pop();

  // If there are packets left to be sent, schedule the stream again.
  RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
  if (stream->packet_queue.empty()) {
    stream->priority_it = stream_priorities_.end();
  } else {
    int priority = stream->packet_queue.top().Priority();
    stream->priority_it = stream_priorities_.emplace(
        StreamPrioKey(priority, stream->size), stream->ssrc);
  }

  return rtp_packet;
}
  • 获得优先级最高的stream;

  • 从stream的packet_queue当中取出第一个包;

  • 将stream在stream_priorities_中的项删除;

    • 思考下虽然包删除了,但是stream还在,为啥要删除?

      答案:因为stream_priorities_ 是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。

  • 计算Packet入队后距离现在的时间(不包括暂停时间);

  • 将这段时间从队列的总时间减去;

  • enqueue_times_中将Packet的项删除;

  • 总包数减去1;

  • 总字节数减去包的字节数;

  • 将包从stream中的queue中弹出;

  • 如果stream中的队列为空,则令stream的priority_it指向stream_priorities_end();

  • 否则,从stream队列头部取出Packet,将该Packet的priority插入到stream_priorities_中;

四、优先级调度:

  • 每个流有单独的优先级队列(PriorityPacketQueue),保存 QueuedPacket 对象。

  • QueuedPacket 是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。

  • 列表中的流按 StreamPrioKey 保存在 stream_priorities_ 中,通过此键决定流次序:

    struct StreamPrioKey {
      int priority;    // 流的优先级,数值越低,优先级越高
      DataSize size;   // 数据包大小,用于平衡负载
    };
    

    优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。

五、轮询调度:

  • 核心逻辑通过 RoundRobin 的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。
  • 函数 GetHighestPriorityStream 定位当前最高优先级的流,从流对应的队列中取得包然后发送。

六、Stream定义与管理:

每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:

  • 当前队列状态,如总字节数、包大小和优先级。
  • 内部维护单独的优先级队列(PriorityPacketQueue)。
  • 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。

七、其他特性:

7.1、重传支持:

  • 标记是否重传的包,在出队时可能依据该标记进行特殊处理。

7.2、时间相关:

  • queue_time_sum_pause_time_sum_ 用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。
  • 提供接口如 AverageQueueTime 计算平均队列时间,用于监控流的实时性。

7.3、队列字节限制:

队列有最大可存储的字节数(max_size_),以防止占用过多资源。

7.4、暂停/恢复功能:

可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!

八、总结:

RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944274.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

硬件设计-时钟振荡器

目录 摘要 壳式晶振 正常工作条件 摘要 本章主要介绍了晶振的分类、各项参数的意义、特点&#xff0c;同时也介绍了时钟抖动的成因、测量 方法、消除措施和典型滤波电路&#xff0c;使得我们可以正确地选择和使用晶振。 壳式晶振 如图 所示&#xff0c;壳式晶振的名字来源于…

Redis基础知识分享(含5种数据类型介绍+增删改查操作)

一、redis基本介绍 1.redis的启动 服务端启动 pythonubuntu:~$ redis-server客户端启动 pythonubuntu:~$ redis-cli <127.0.0.1:6379> exit pythonubuntu:~$ redis-cli --raw //(支持中文的启动方式) <127.0.0.1:6379> exit2.redis基本操作 ping发送给服务器…

sql字段值转字段

表alertlabel中记录变字段 如何用alertlabel表得到下面数据 实现的sql语句 select a.AlertID, (select Value from alertlabel where AlertIDa.AlertID and Labelhost) as host, (select Value from alertlabel where AlertIDa.AlertID and Labeljob) as job from (select …

llamafactory报错:双卡4090GPU,训练qwen2.5:7B、14B时报错GPU显存不足(out of memory),轻松搞定~~~

实际问题场景&#xff1a; 使用llamafactory进行微调qwen2.5 7B和14B的大模型时&#xff0c;会出现out of memory的报错。尝试使用降低batch_size&#xff08;原本是2&#xff0c;现在降到1&#xff09;的方式&#xff0c;可以让qwen2.5:7B跑起来&#xff0c;但时不时会不稳定…

【hackmyvm】hacked靶机wp

tags: HMVrootkitDiamorphine Type: wp 1. 基本信息^toc 文章目录 1. 基本信息^toc2. 信息收集2.1. 端口扫描2.2. 目录扫描2.3. 获取参数 3. 提权 靶机链接 https://hackmyvm.eu/machines/machine.php?vmHacked 作者 sml 难度 ⭐️⭐️⭐️⭐️️ 2. 信息收集 2.1. 端口扫描…

.NET平台用C#通过字节流动态操作Excel文件

在.NET开发中&#xff0c;通过字节流动态操作Excel文件提供了一种高效且灵活的方式处理数据。这种方法允许开发者直接在内存中创建、修改和保存Excel文档&#xff0c;无需依赖直接的文件储存、读取操作&#xff0c;从而提高了程序的性能和安全性。使用流技术处理Excel不仅简化了…

应用层1——C/S、P2P、DNS域名系统

目录 一、网络应用模型 1、C/S 2、p2p模型 二、域名解析系统DNS 1、为什么有DNS系统&#xff1f; 2、域名的特点 3、DNS域名系统原理 4、递归查询、迭代查询 5、常用的根域名与顶级域名 一、网络应用模型 1、C/S 客户/服务器模型 客户请求服务&#xff0c;服务器提供…

【疑难杂症】 HarmonyOS NEXT中Axios库的响应拦截器无法拦截424状态码怎么办?

今天在开发一个HarmonyOS NEXT的应用的时候&#xff0c;发现http接口如果返回的状态码是424时&#xff0c;我在axios中定义的拦截器失效了。直接走到了业务调用的catch中。 问题表现&#xff1a; 我的拦截器代码如下&#xff1a; 解决办法&#xff1a; 先说解决办法&#xff…

在Windows上读写Linux磁盘镜像的一种方法

背景 嵌入式开发中&#xff0c;经常会把系统的Linux磁盘镜像保存到Windows上&#xff0c;以便上传到网盘备份或发送给工厂&#xff0c;但是如果想读取/修改镜像中的某个文件&#xff0c;一般有2种方案&#xff1a; 直接访问 就是用虚拟磁盘软件将镜像文件挂载成磁盘&#xf…

ffmpeg之显示一个yuv照片

显示YUV图片的步骤 1.初始化SDL库 目的&#xff1a;确保SDL库正确初始化&#xff0c;以便可以使用其窗口、渲染和事件处理功能。操作&#xff1a;调用 SDL_Init(SDL_INIT_VIDEO) 来初始化SDL的视频子系统。 2.创建窗口用于显示YUV图像&#xff1a; 目的&#xff1a;创建一个…

Windows下播放文件作为麦克风声源的一种方式

近期测试一种外语的ASR识别成功率&#xff0c;样本素材是懂这门语言的同事录制的mp3文件。测试client端原本是从麦克风拾音生成媒体流的。 这样&#xff0c;就需要想办法把mp3文件转换为测试client的输入声音。物理方式上&#xff0c;可以用一根音频线&#xff0c;把电…

如何在网页端使用 IDE 高效地阅读 GitHub 源码?

如何在网页端使用 IDE 高效地阅读 GitHub 源码&#xff1f; 前言什么是 GitHub1s&#xff1f;使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…

Microsoft word@【标题样式】应用不生效(主要表现为在导航窗格不显示)

背景 随笔。Microsoft word 2013基础使用&#xff0c;仅做参考和积累。 问题 Microsoft word 2013&#xff0c;对段落标题文字应用【标题样式】不生效&#xff08;主要表现为在导航窗格不显示&#xff09;。 图1 图2 观察图1和图2&#xff0c;发现图1的文字在应用【标题一】样…

2021.12.28基于UDP同信的相关流程

作业 1、将TCP的CS模型再敲一遍 服务器 #include <myhead.h> #define PORT 8888 #define IP "192.168.124.123" int main(int argc, const char *argv[]) {//创建套接字//绑定本机IP和端口号//监听客户端请求//接收客户端连接请求//收发消息//创建套接字int…

OpenCV和PyQt的应用

1.创建一个 PyQt 应用程序&#xff0c;该应用程序能够&#xff1a; 使用 OpenCV 加载一张图像。在 PyQt 的窗口中显示这张图像。提供四个按钮&#xff08;QPushButton&#xff09;&#xff1a; 一个用于将图像转换为灰度图一个用于将图像恢复为原始彩色图一个用于将图像进行翻…

kibana启动报错:Invalid character in header content [“kbn-name“]

启动时候kibana报错&#xff1a; 打开 kibana配置文件&#xff0c;config/kibana.yml&#xff0c;配置上server.name即可&#xff0c;如下&#xff1a;

Pandas08

Pandas01 Pandas02 Pandas03 Pandas04 Pandas05 Pandas06 Pandas07 文章目录 内容回顾同期群分析1.1 同期群分析概念1.2 案例代码 数据分析报告数据分析工作内容数据分析简历说明用户生命周期标签1 什么是生命周期标签2 如何计算生命周期标签 内容回顾 TGI 偏好分析 TGI 目标…

网页数据的解析提取之Beautiful Soup

前面博客介绍了正则表达式的相关用法&#xff0c;只是一旦正则表达式写得有问题&#xff0c;得到的结果就可能不是我们想要的了。而且每一个网页都有一定的特殊结构和层级关系&#xff0c;很多节点都用id或 class 作区分所以借助它们的结构和属性来提取不也可以吗? 本篇博客我…

电脑缺失sxs.dll文件要怎么解决?

一、文件丢失问题&#xff1a;以sxs.dll文件缺失为例 当你在运行某个程序时&#xff0c;如果系统提示“找不到sxs.dll文件”&#xff0c;这意味着你的系统中缺少了一个名为sxs.dll的动态链接库文件。sxs.dll文件通常与Microsoft的.NET Framework相关&#xff0c;是许多应用程序…

进军AI大模型-环境配置

语言环境配置 合法上网工具&#xff1a; 这个T子试试&#xff0c;一直稳定。走我链接免费用5天: https://wibnm.com/s/ywtc01/pvijpzy python版本&#xff1a; python3.12 Langchain: Introduction | &#x1f99c;️&#x1f517; LangChain v0.3 9月16日升级的版本 pip3…