前面的文章讲到ZLMediaKit转流,提到过RingBuffer
,它是比较核心的数据结构。这篇文章就来讲讲RingBuffer
的用法。
RingBuffer的类体系
RingBuffer是由多个类组成,分为两大功能:存储和数据分发。
存储功能由类RingStorage
实现,分发功能由RingReaderDispactcher
,RingDelegate
,RingReader
)。下面是它们的类图:
RingBuffer
类是"大总管",封装整个体系的功能,提供对外的接口。
数据存储
RingStorage
是数据存储类,它是一个循环队列,有最大容量定义,从尾部插入最新数据,当队列满了,从头部删除老数据。
它的对所存储的数据的定义,借用了视频GOP的概念。
将GOP视为一个元素(一个视频GOP中包含多个视频nalu)。
在RingStorage
中将GOP称为组更适合,组里有包含的更基本的元素,下面是它的定义:
template <typename T>
class _RingStorage
基本元素为模板类型,可以存入任意类型。
它包含了一个类型为GopType
容器,如下:
using GopType = List<List<std::pair<bool, T>>>;
GopType _data_cache;
它是一个List
,元素也是一个List
。可见是以组为单位存储数据。
在视频数据中,GOP包含的是两个关键帧之间的的nalu数据,所以它的write
接口有一个是否为key的参数,如下:
void write(T in, bool is_key = true)
它的构造函数如下:
_RingStorage(size_t max_size, size_t max_gop_size)
max_size
是指最大元素个数,就是GOP的数量*GOP的大小。
max_gop_size
是指最大GOP的个数。
下面是一个使用示例:
//RingBuffer是_RingStorage的封装
//最大size为100,GOP最大个数为1
RingBuffer<int>::Ptr g_ringBuf(new RingBuffer<int>(100,nullptr,1));
//GOP 011
g_ringBuf->write(0,true);
g_ringBuf->write(1,false);
g_ringBuf->write(1,false);
//GOP 022
g_ringBuf->write(0,true);
g_ringBuf->write(2,false);
g_ringBuf->write(2,false);
//GOP 033
g_ringBuf->write(0,true);
g_ringBuf->write(3,false);
g_ringBuf->write(3,false);
上面的例子将0作为key(当然可以是任意值),两个key之间就是GOP的数据(GOP的长度可以是任意长度)。
因为定义的GOP个数为1,所以buffer最终缓存的是0,3,3。前面的0,1,1,0,2,2都被删除了。
对视频nalu数据来说,**RingStorage**
就是一个GOP buffer,缓存最少一个GOP的数据。这样可以保证快速出图。
数据分发
先看RingBuffer的整体结构图
RingBuffer
中的数据结构std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map
,是以EventPoller
对象为key,所以它可以跨线程的分发数据。
RingReaderDispatcher
内有多个RingReader
对象,是数据流向的目的端。
RingReader
就是数据的出口,调用RingBuffer
类的attch
方法获取一个RingReader
对象,再调用setReadCB
方法设置数据回调,就可以取到数据了。
attch
有一个EventPoller
类型的形参,需要传入的是目的对象所在的线程。
在这篇文章中提到过,MediaSource
对象作为数据源,内部都有一个RingBuffer,通过它拿到一个RingReader
对象后就可以取到这个MediaSource
的源了。
比如,以rtmp推流,http-flv拉流时,那么连接rtmp源和flv的基本代码结构如下:
//poller为_ring_reader对象所在的线程
_ring_reader = media->getRing()->attach(poller);
//获取源信息的回调
_ring_reader->setGetInfoCB(...);
//当源关闭时的回调
_ring_reader->setDetachCB(...);
//设置读取数据的回调
_ring_reader->setReadCB(...);
具体的代码见,void FlvMuxer::start
方法。
下面是RingBuffer中数据流转图
通过write写入数据,数据从RingBuffer
到RingReaderDispatcher
,再到RingReader
,再通过onReadCB
回调至dst。
RingBuffer
使用的例子
#include <iostream>
#include "Util/logger.h"
#include "Util/util.h"
#include "Util/RingBuffer.h"
using namespace std;
using namespace toolkit;
//创建一个RingBuffer对象,存储int元素
//最大size为100,缓存(max_gop_size)为1
RingBuffer<int>::Ptr g_ringBuf(new RingBuffer<int>(100,nullptr,1));
//数据回调
void onReadEvent1(int i) {
std::cout<<i<<std::endl;
}
//src释放时的回调
void onDetachEvent(){
WarnL;
}
int main() {
//初始化日志
auto fileChannel = std::make_shared<toolkit::FileChannel>("FileChannel", toolkit::exeDir());
Logger::Instance().add(fileChannel);
Logger::Instance().setWriter(std::make_shared<toolkit::AsyncLogWriter>());
//RingBuffer reader线程
auto poller1 = EventPollerPool::Instance().getPoller(false);
//在线程中设置reader
poller1->async([&]{
//通过attach方法获取一个RingReader,设置为使用cache
auto reader = g_ringBuf->attach(poller1,true);
//设置数据读取回调
reader->setReadCB([](int i){
onReadEvent1(i);
});
//设置src关闭时的回调
reader->setDetachCB([](){
onDetachEvent();
});
});
//在主线程中写入数据
//GOP 011
g_ringBuf->write(0,true);
g_ringBuf->write(1,false);
g_ringBuf->write(1,false);
//GOP 022
g_ringBuf->write(0,true);
g_ringBuf->write(2,false);
g_ringBuf->write(2,false);
//GOP 033
g_ringBuf->write(0,true);
g_ringBuf->write(3,false);
g_ringBuf->write(3,false);
std::this_thread::sleep_for(std::chrono::seconds(10));
}