1 基础组件
live项目主要包含了四个基础库、程序入口类(mediaServer)和测试程序(testProgs)。四个基础库是UsageEnvironment、BasicUsageEnvironment、groupsock和liveMedia
UsageEnvironment
抽象了两个类UsageEnvironment和TaskScheduler,UsageEnvironment表示整个运行环境,提供错误记录和输出的功能。TaskScheduler表示任务调度中心,用于异步事件的读取和处理。该库中还有一个抽象类HashTable,这是一个通用的HashTable,在整个项目中都可以使用它。
BasicUsageEnvironment
UsageEnvironment和TaskScheduler的具体实现类
groupsock
网络接口的封装,用于数据包的接收和发送,同时支持多播和单播。groupsock库中包括了GroupEId、Groupsock、GroupsockHelper、NetAddress、NetInterface等类。GroupsockHelper类主要用于读写Socket。
liveMedia
基于基类Medium,实现各种流媒体和编解码类型结构,定义了source(生产者)和sink(消费者)操作。
mediaServer
mediaServer下的live555MediaServer提供了main函数,DynamicRTSPServer继承了RTSPServer并重写了虚函数lookupServerMediaSession
2 主程序
2.1 基本概念和实体
MediaServer是服务器的抽象。
- 它创建用于接受客户端连接的socket。它还是其他实体的容器。
ClientConnection是与客户端的数据连接的抽象。
- 当客户端连接服务器时,MediaServer创建ClientConnection的实例,保存在成员fClientConnections[]中。
MediaSession是媒体的抽象。
- 当用户请求建立媒体连接时,创建MediaSession实例,保存在MediaServer的成员fServerMediaSessions[]中。这是一个Hash表,key值是媒体的名字。
媒体中可以有多个通道,MediaSubsession是媒体通道的抽象。
- 创建MediaSession时,同时为其中的通道创建MediaSubsession实例,保存在成员fSubsessionHead指向的链表中。
ClientSession是与客户端的对话的抽象,承载在ClientConnection上。
- 当客户端请求开始播放时,创建ClientSession实例,保存在成员fClientSessions[]中。这是一个Hash表,key值是全局唯一的session id。
StreamState是ClientSession用于挂接到MediaSubsession的中介。
- ClientSession要挂接到MediaSession上,获得媒体源。它的成员fStreamStates[]引用MediaSession中的MediaSubsession实例。
- fStreamStates[]是一个数组,这里的trackNum指它的索引。
2.2 main函数
main函数创建任务调度器,创建RTSPServer实例,将它的socket置于调度器的监听下,最后运行调度器,处理socket事件。
- 调用BasicTaskScheduler::createNew()创建任务调度器,这是一个BasicTaskScheduler实例。
- 引用调度器实例,创建BasicUsageEnvironment实例。
- 调用DynamicRTSPServer::createNew(),创建RTSPServer实例。
- 调用GenericMediaServer::setUpOurSocket()创建TCP socket。createNew()的参数指定本地端口号,这里先指定端口号554。但绑定可能失败,如果失败了,main()会再次调用createNew(),用端口号8554再重试一次。
- 创建的socket保存在GenericMediaServer的成员fServerSocket中。
- 调用TaskSheduler::turnOnBackgroundReadHandling()将socket置于监听状态,回调函数为GenericMediaServer::incomingConnectionHandler()。
GenericMediaServer
::GenericMediaServer(UsageEnvironment& env, int ourSocketIPv4, int ourSocketIPv6, Port ourPort,
unsigned reclamationSeconds)
: Medium(env),
fServerSocketIPv4(ourSocketIPv4), fServerSocketIPv6(ourSocketIPv6),
fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
fClientSessions(HashTable::create(STRING_HASH_KEYS)),
fPreviousClientSessionId(0),
fTLSCertificateFileName(NULL), fTLSPrivateKeyFileName(NULL) {
ignoreSigPipeOnSocket(fServerSocketIPv4); // so that clients on the same host that are killed don't also kill us
ignoreSigPipeOnSocket(fServerSocketIPv6); // ditto
// Arrange to handle connections from others:
env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv4, incomingConnectionHandlerIPv4, this);
env.taskScheduler().turnOnBackgroundReadHandling(fServerSocketIPv6, incomingConnectionHandlerIPv6, this);
}
void turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void* clientData) {
setBackgroundHandling(socketNum, SOCKET_READABLE, handlerProc, clientData);
}
void BasicTaskScheduler
::setBackgroundHandling(int socketNum, int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData) {
if (socketNum < 0) return;
#if !defined(__WIN32__) && !defined(_WIN32) && defined(FD_SETSIZE)
if (socketNum >= (int)(FD_SETSIZE)) return;
#endif
FD_CLR((unsigned)socketNum, &fReadSet);
FD_CLR((unsigned)socketNum, &fWriteSet);
FD_CLR((unsigned)socketNum, &fExceptionSet);
if (conditionSet == 0) {
fHandlers->clearHandler(socketNum);
if (socketNum+1 == fMaxNumSockets) {
--fMaxNumSockets;
}
} else {
fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
if (socketNum+1 > fMaxNumSockets) {
fMaxNumSockets = socketNum+1;
}
if (conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
if (conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
if (conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
}
}
- 调用RTSPServer::rtspURLPrefix()得到URL信息,其中调用getourIPAddress()得到本地地址。
- 调用BasicTaskScheduler0::doEventLoop()调度任务。
GenericMediaServer::setUpOurSocket()创建TCP socket。
- 调用setupStreamSocket()创建TCP socket。
- 调用increaseSendBufferTo()重新设置增加缓存大小。
- 调用listen()开始监听。
BasicTaskScheduler0::doEventLoop()消息循环。
- 循环调用BasicTaskScheduler::SingleStep()
- 为所有需要操作的socket执行select。
- 找到第一个socket handler执行。
- 找到第一个响应事件执行。
- 找到第一个延迟任务执行。
2.3 GenericMediaServer::incomingConnectionHandler()
当有新的数据连接请求时,GenericMediaServer::incomingConnectionHandler()被调用。其中调用incomingConnectionHandlerOnSocket(),参数是成员fServerSocket。
- 调用accept(),返回新数据连接的socket,同时得到客户端的地址。
- 调用makeSocketNonBlocking()设置socket为非阻塞模式。
- 调用increaseSendBufferTo()增加发送缓存大小。
- 用新连接的socket和客户端地址,调用RTSPServer::createNewClientConnection()。
在createNewClientConnection()中
- 创建RTSPClientConnection实例。
- ClientConnection的成员fOurSocket保存新连接的socket值,成员fClientAddr保存客户端地址。
- GenericMediaServer的成员fClientConnections->Add保存ClientConnection的实例。
- 调用TaskScheduler::setBackgroundHandling(),将新连接的socket置于监听状态,回调函数为ClientConnection::incomingRequestHandler()。
- 调用RTSPClientConnection::resetRequestBuffer()复位与接收缓存有关的位置指示变量。
2.4 ClientConnection::incomingRequestHandler()
当socket有数据到达时,ClientConnection::incomingRequestHandler()被调用。
- 调用readSocket()读取数据,这是RTSP请求字符串。接收缓存是成员fRequestBuffer[]。
- 调用RTSPClientConnection::handleRequestBytes解析RTSP请求并处理。
- 调用parseRTSPRequestString()解析请求字符串,得到RTSP命令字及参数,包括:cmdName请求命令字(如:OPTIONS),urlSuffix是文件名,cseq是请求序列号。
- 后面根据cmdName,调用不同的函数处理。如handleCmd_OPTIONS()处理OPTIONS请求等。
- 对于SETUP请求,调用GenericMediaServer::createNewClientSessionWithId创建ClientSession实例。
- 对于SETUP之后的请求,parseRTSPRequestString()得到有效的sessionIdStr,调用GenericMediaServer::lookupClientSession()得到对应的ClientSession实例。然后调用RTSPClientSession::handleCmd_withinSession(),再调用相应的处理函数。
if (authenticationOK("SETUP", urlTotalSuffix, (char const*)fRequestBuffer)) {
clientSession
= (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
} else {
areAuthenticated = False;
}
if (requestIncludedSessionId) {
clientSession
= (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr));
if (clientSession != NULL) clientSession->noteLiveness();
}
else if (strcmp(cmdName, "TEARDOWN") == 0
|| strcmp(cmdName, "PLAY") == 0
|| strcmp(cmdName, "PAUSE") == 0
|| strcmp(cmdName, "GET_PARAMETER") == 0
|| strcmp(cmdName, "SET_PARAMETER") == 0) {
if (clientSession != NULL) {
clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
}
2.5 RTSPClientConnection::handleCmd_OPTIONS
填充回复字符串,告诉客户端支持哪些请求
void RTSPServer::RTSPClientConnection::handleCmd_OPTIONS() {
snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,
"RTSP/1.0 200 OK\r\nCSeq: %s\r\n%sPublic: %s\r\n\r\n",
fCurrentCSeq, dateHeader(), fOurRTSPServer.allowedCommandNames());
}
2.6 RTSPClientConnection::handleCmd_DESCRIBE
- 调用DynamicRTSPServer::lookupServerMediaSession()查找ServerMediaSession实例,因为这时实例还不存在,所以这里会先创建一个。执行回调RTSPClientConnection::handleCmd_DESCRIBE_afterLookup
- 用这个ServerMediaSeesin实例调用generateSDPDescription()。
- 调用ourIPAddress()得到本地地址。
- 构建会话级sdp字符串,本地地址是它的一个组成部分。
- 遍历ServerMediaSession的成员fSubsessionHead,对其中的subsession,调用OnDemandServerMediaSubsession::sdpLines()得到媒体级sdp字符串。
- 连接会话级sdp字符串和媒体级sdp字符串。
- 调用RTSPServer::rtspURL()得到URL信息。这是头部字符串的一部分。
DynamicRTSPServer::lookupServerMediaSession()根据key值,在GenericMediaServer的成员fSerMediaSessions中查找对应的实例。fServerMediaSessesions是一个Hash表。lookupServerMediaSession()实现:
- fopen打开文件,如test.264
- 实例不存在调用全局函数createNewSMS()创建新的MediaSession实例。并调用GenericMediaServer::addServerMediaSession()将它加入成员fServerMediaSessions。
在全局函数createSMS()中
- 从文件名test.264中,解析出扩展名“.264”。根据扩展名做不同处理。 对于扩展名“.264”,NEW_SMS("H.264 Video"),创建ServerMediaSession实例。
- 调用H264VideoFileServerMediaSubsession::createNew(),它创建ServerMediaSubsession实例,文件名保存在成员fFileName中。后面创建FileSource实例时将用到它。
- 调用ServerMediaSession::addSubsession把这个ServerMediaSubsession加入ServerMediaSession的子session表。ServerMediaSession的成员fSubsesisionHead和fSubsessionTail指向这个表的首部和尾部。
OnDemandServerMediaSubsession::sdpLines()得到子session的sdp字符串。sdpLines实现:
- 调用createNewStreamSource()创建FramedSource实例
- 调用createGroupsock()创建Groupsock实例
- 基于FramedSource和Groupsock实例,调用createNewRTPSink()创建RTPSink实例。
- 调用setSDPLinesFromRTPSink()。其中调用RTPSink的接口构建sdp字符串。
- 清理掉以上创建的FramedSource、Groupsock、和RTPSink实例。这时只有ServerMediaSession和ServerMediaSubsession的实例保留下来。
2.7 RTSPClientSession::handleCmd_SETUP
先调用createNewClientSessionWithId()创建ClientSession实例,再调用handleCmd_SETUP()处理。注意handleCmd_SETUP是RTSPClientSession的成员函数,handleCmd_DESCRIBE是RTSPClientConnection的成员函数。
createNewClientSessionWithId()实现:
- 调用our_random32()生成sessionid。
- 用sessionid调用RTSPServer::createNewClientSession()创建RTSPClientSession实例。
- ClientConnection的构造函数调用TaskScheduler::resheduleDelayedTask()设置延时任务,回调函数是ClientSession::LivenessTimeoutTask()。这个函数的作用是在这个Client session实例长时间不工作时,删除它。
- 将这个实例加入GenericMediaServer的成员fClientSessions,这是一个ClientSession实例的Hash表。
RTSPClientSession::handleCmd_SETUP()实现:
- 调用lookupServerMediaSession(),找到对应的ServerMediaSession实例。这个实例在handleCmd_DESCRIBE()中已经创建好了。
- 调用numSubsessions()得到ServerMediaSession中的subsession数量。
- 新建streamState数组fStreamStates[],长度为subsession数量。fStreamStates[]引用ServerMediaSubsession对象,这些对象保存在fSubsessionsHead链表中
- 比对subsession的trackId成员和请求串中的trackId(如"track1"),相同则纪录trackNum。
- 调用parseTransportHeader()从请求串中解析得到传输头中包括的数据流配置信息。如请求串"RTP/AVP;unicast;client_port=65512-65513",代表单播UDP,RTP连接的客户端端口为65512,RTCP连接的客户端端口为65513。
- 调用parseRangeHeader()解析得到点播范围。如果失败,则再调用parsePlayNowHeader()看能不能找到。没有设置点播范围两个函数都返回失败,则从头开始播到尾。
- 用前面找到的trackNum对应的subsession,调用OnDemandServerMediaSubsession::getStreamParameters()创建数据流通道。
- 与subsesssion对应的streamState,它的成员streamToken,是这个函数的一个参数。OnDemandServerMediaSubsession实例将它当做辅助数据结构,它被定义为类StreamState,与streamState只是首字母不同。
- 构造回复字符串。
getStreamParameters()实现:
- 它的参数clientAddress指定了目标地址。这里因为请求中没有包括“destination=xxx”,所以这个地址是0。这时将目标地址指定ClientConnection的客户端地址。
- 调用createNewStreamSource()创建FramedSource实例。
- 创建用于RTP连接和RTCP连接的Groupsock实例。
- 从成员fInitialPortNum指定的起始端口开始,尝试可用的本地端口,作为RTP连接的本地端口。fInitialPortNum在OnDemandServerMediaSubsession的构造函数中,指定缺省值为6970。
- 调用createGroupsock()创建RTP连接的Groupsock实例。
- 如果不复用RTCP连接和RTP连接,则将RTCP连接的端口加1,创建RTCP连接的Groupsock实例。
- 为RTP连接,调用createNewRTPSink()创建RTPSink实例。如:.264格式文件调用H264VideoFileServerMediaSubsession::createNewRTPSink()。
- 基于上述Groupsock和RTPSink实例,创建StreamState实例。
- 创建Destinations实例,保存客户端地址和端口,将Destination实例加入成员fDestinationHashTable中。
2.8 RTSPClientSession::handleCmd_PLAY
实现:
- 调用parseScaleHeader()和parseRangeHeader(),得到播放的起点和终点。
- 遍历成员fNumStreamStates,streamState的成员subsession,调用OnDemandServerMediaSubsession::seekStream(),调整播放位置。
- 遍历成员fNumStreamStates,streamState的成员subsession,调用OnDemandServerMediaSubsession::startStream(),开始播放。
- 构造回复字符串
OnDemandServrMediaSubsession::startStream()实现:
- 根据clientSessionId在成员fDestinationHashTable中查找对应的Destinations实例,其中保存了目标地址。
- 调用StreamState::startPlaying()开始播放。
- 这个StreamState实例是在处理SETUP请求时创建的, streamState结构体的成员streamToken保存了这个实例。
- 调用OnDemandServerMediaSubsession::createRTCP()创建RTCPInstance实例。RTPSink实例和RTCP连接的Groupsock实例作为参数。并调用setAppHandler()和setSpecificRRHandler()设置回调函数。
- 调用MediaSink::startPlaying(),开始解析媒体帧并通过RTP连接发送,同时调用RTCPInstance::sendReport()发送RTCP信息。
接收RTCP信息实现:
- 调用RTCPInstance::setByeHandler设置一个回调函数,当收到bye消息时,该回调函数被调用。回调函数保存在成员fByeHandlerTask。
- 在RTCPInstance构造函数中,调用RTCPInterface::startNetworkReading(),将RTCPInstance::incomingReportHandler()设置为数据可读时的回调函数。
在RTCPInstance::incomingReportHandler()中
- 调用RTCPInterface::handleRead()读取数据到本地缓存。
- 调用processIncomingReport()处理缓存。如果是bye消息,调用fByeHandlerTask()进行处理。
MediaSink::startPlaying()实现:
- 调用MultiFramedRTPSink::continuePlaying(),其实是调用MultiFramedRTPSink::buildAndSendPacket()。
- buildAndSendPacket封装RTP头,调用packFrame()打包帧数据
- packFrame()从source文件读取一帧数据,通过回调afterGettingFrame()返回给sink,其实是调用afterGettingFrame1()。
- afterGettingFrame1()向包中打帧数据,包完成则调用sendPacketIfNecessary()发送包,反之则调用packFrame()从source获取帧数据打包。
- packFrame()从source文件读取一帧数据,通过回调afterGettingFrame()返回给sink,其实是调用afterGettingFrame1()。
- buildAndSendPacket封装RTP头,调用packFrame()打包帧数据
MultiFramedRTPSink中的帧数据和包缓冲区共用一个,只是用一些额外的变量指明缓冲区中属于包的部分以及属于帧数据的部分(包以外的数据叫做overflow data)。