RTMP推流到SRS流媒体服务器消息处理

RTMP推流到SRS流媒体服务器消息处理

  1. SRS和客户端是怎么交换消息的?各个消息有什么作用?
  2. 握手成功后,SRS和客户端进行消息交换,对应wiresharek这部分截图:

image.png

  1. 流程图(之前画的,可能不够详细,后面补充):

srs和客户端建联流程,从connect开始.png

1. 客户端发送connect连接请求到SRS服务端

  1. connect命令,⽤于客户端向服务器发送连接请求。
  2. wiresharek截图:

image.png

1. connect消息解析
  1. Chunk Stream Id:接收端根据相同的chunk stream id拼装出message, 这⾥是3,对应ffmpeg的枚举RTMP_SYSTEM_CHANNEL。
  2. Type ID (即是message type id) ⽐如8⾳频,9视频,20命令消息,这⾥是20命令消息,对应ffmpeg的枚举RTMP_PT_INVOKE。
  3. Stream ID为0。
  4. RTMP body是⼀个connect 命令消息,这些信息是以AMF格式发送的,消息的结构如下:

image.png

  1. 第三个字段中的Command Object中会涉及到很多键值对,使⽤时可以参考协议的官⽅⽂档。消息的回应有两种,_result表示接受连接,_error表示连接失败。
  2. 以下是连接命令对象中使⽤的健值对的描述:

image.png

2. connect消息代码分析
1. 客户端生成connect消息发送到SRS服务器
  1. 对应FFmpeg代码在:
	gen_connect(s, rt)
	static int gen_connect(URLContext *s, RTMPContext *rt)

	//部分代码如下:
    ff_amf_write_string(&p, "connect");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_object_start(&p);
    ff_amf_write_field_name(&p, "app");
    ff_amf_write_string2(&p, rt->app, rt->auth_params);
2. SRS服务端接收connect消息并解析
  1. 对应SRS代码在:
	rtmp->connect_app(req)) != srs_success)
srs_error_t SrsRtmpServer::connect_app(SrsRequest* req) {
    srs_error_t err = srs_success;
    
    SrsCommonMessage* msg = NULL;
    SrsConnectAppPacket* pkt = NULL;
    if ((err = expect_message<SrsConnectAppPacket>(&msg, &pkt)) != srs_success) { //解析connect消息,从流式数据->chunk->message->packet
        return srs_error_wrap(err, "expect connect app response");
    }
    SrsAutoFree(SrsCommonMessage, msg);
    SrsAutoFree(SrsConnectAppPacket, pkt);
    
    SrsAmf0Any* prop = NULL;
    
    if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {
        return srs_error_new(ERROR_RTMP_REQ_CONNECT, "invalid request without tcUrl");
    }
    req->tcUrl = prop->to_str();
    
    if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {
        req->pageUrl = prop->to_str();
    }
    
    if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {
        req->swfUrl = prop->to_str();
    }
    
    if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {
        req->objectEncoding = prop->to_number();
    }
    
    if (pkt->args) {
        srs_freep(req->args);
        req->args = pkt->args->copy()->to_object();
    }
    
    srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
    req->strip();
    
    return err;
}
  1. 具体解析为packet代码为:
srs_error_t SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket)
{
	...        
        // decode command object.
        if (command == RTMP_AMF0_COMMAND_CONNECT) { //第一个接收到的message为"connect"消息
            *ppacket = packet = new SrsConnectAppPacket();
            return packet->decode(stream);
        } 
    ...
}
  1. gdb显示调用栈为:
(gdb) bt
#0  SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb560, ppacket=0xadb550) at src/protocol/srs_rtmp_stack.cpp:700
#1  0x00000000004655ec in SrsProtocol::decode_message (this=0xac1280, msg=0xade990, ppacket=0xadb5e0) at src/protocol/srs_rtmp_stack.cpp:430
#2  0x000000000047f8c0 in SrsProtocol::expect_message<SrsConnectAppPacket> (this=0xac1280, pmsg=0xadb670, ppacket=0xadb678) at src/protocol/srs_rtmp_stack.hpp:335
#3  0x000000000047dc7d in SrsRtmpServer::expect_message<SrsConnectAppPacket> (this=0xac1250, pmsg=0xadb670, ppacket=0xadb678) at src/protocol/srs_rtmp_stack.hpp:776
#4  0x000000000046df7b in SrsRtmpServer::connect_app (this=0xac1250, req=0xacab30) at src/protocol/srs_rtmp_stack.cpp:2310
#5  0x00000000004d3594 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:170
#6  0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#7  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#8  0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#9  0x00000000005bed1a in _st_thread_main () at sched.c:337
#10 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

2. SRS服务器发送Window Acknowledgement Size消息给客户端,默认2500000

1. Window Acknowledgement Size消息解析
  1. wiresharek截图:

image.png

  1. Type Id:0x5 对应ffmepg的RTMP_PT_WINDOW_ACK_SIZE
  2. Window Acknowledgement Size⽤于设置窗⼝确认⼤⼩,⽐如这⾥是服务器发送给客户端的,当客户端收到每次该size就要Acknowledgement (0x3)。
  3. 对于ffmpeg,在接收到Window Acknowledgement Size的⼀半后发送确认包(Acknowledgement),以确保对等⽅可以继续发送⽽不等待确认。
  4. 注意点:
    1. 客户端作为推流端时,⼀般即使没有收到服务器的ack,客户端也不会停⽌码流的推送。
    2. 当客户端作为拉流端时,⼀般即使拉流端不回应ack,服务器也不会停⽌码流的发送。
    3. 但彼此如果作为接收⽅时,收到1/2Windows size的数据后对会ack对⽅。
2. Window Acknowledgement Size消息代码分析
  1. SRS服务器发送Window Acknowledgement Size消息给客户端
rtmp->set_window_ack_size(out_ack_size)
srs_error_t SrsRtmpServer::set_window_ack_size(int ack_size){
    srs_error_t err = srs_success;
    
    SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
    pkt->ackowledgement_window_size = ack_size;
    if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
        return srs_error_wrap(err, "send ack");
    }
    
    return err;
}
  1. FFmpeg客户端处理SRS服务器发送过来的
static int handle_window_ack_size(URLContext *s, RTMPPacket *pkt)
{
    RTMPContext *rt = s->priv_data;

    if (pkt->size < 4) {
        av_log(s, AV_LOG_ERROR,
               "Too short window acknowledgement size packet (%d)\n",
               pkt->size);
        return AVERROR_INVALIDDATA;
    }

    rt->receive_report_size = AV_RB32(pkt->data);
    if (rt->receive_report_size <= 0) {
        av_log(s, AV_LOG_ERROR, "Incorrect window acknowledgement size %d\n",
               rt->receive_report_size);
        return AVERROR_INVALIDDATA;
    }
    av_log(s, AV_LOG_DEBUG, "Window acknowledgement size = %d\n", rt->receive_report_size);
    // Send an Acknowledgement packet after receiving half the maximum
    // size, to make sure the peer can keep on sending without waiting
    // for acknowledgements.
    rt->receive_report_size >>= 1;

    return 0;
}

3. srs服务器发送Peer Bandiwdth消息给客户端,默认2500000

1. Peer Bandiwdth消息解析
  1. wiresharek截图:

image.png

  1. 客户端或服务器端发送此消息更新对端的输出带宽,作用是限制对端的输出带宽。和Window Acknowledgement Size相⽐,重点是更新
  2. Type ID:0x6 对应ffmpeg的RTMP_PT_SET_PEER_BW
  3. 如果消息中的Window ACK Size与上⼀次发送给发送端的size不同的话要回馈⼀个Window Acknowledgement Size的控制消息
    1. image.png
      1. Hard(Limit Type=0):接收端应该将Window Ack Size设置为消息中的值
      2. Soft(Limit Type=1):接收端可以将Window Ack Size设为消息中的值,也可以保存原来的值(前提是原来的Size⼩与该控制消息中的Window Ack Size)
      3. Dynamic(Limit Type=2):如果上次的Set Peer Bandwidth消息中的Limit Type为0,本次也按Hard处理,否则忽略本消息,不去设置Window Ack Size
2. Peer Bandiwdth消息代码分析
  1. SRS服务器发送 Peer Bandiwdth消息给客户端。
srs_error_t SrsRtmpServer::set_peer_bandwidth(int bandwidth, int type)
{
    srs_error_t err = srs_success;
    
    SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();
    pkt->bandwidth = bandwidth;
    pkt->type = type;
    if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
        return srs_error_wrap(err, "send set peer bandwidth");
    }
    
    return err;
}

4. srs服务器发送Set Chunk Size消息给客户端

1. Set Chunk Size消息解析
  1. wiresharek截图:image.png
  2. Set Chunk Size(Message Type ID=1):设置chunk中Data字段所能承载的最⼤字节数,默认为128B,通信过程中可以通过发送该消息来设置chunk Size的⼤⼩(不得⼩于128B),⽽且通信双⽅会各⾃维护⼀个chunkSize,两端的chunkSize是独⽴的。
    1. ⽐如当A想向B发送⼀个200B的Message,但默认的chunkSize是128B,因此就要将该消息拆分为Data分别为128B和72B的两个chunk发送,如果此时先发送⼀个设置chunkSize为256B的消息,再发送Data为200B的chunk,本地不再划分Message,B接受到Set Chunk Size的协议控制消息时会调整的接受的chunk的Data的⼤⼩,也不⽤再将两个chunk组成为⼀个Message。
    2. 在实际中⼀般会把chunk size设置的很⼤,有的会设置为4096,FFMPEG推流的时候设置的是 60*1000,这样设置的好处是避免了频繁的拆包组包,占⽤过多的CPU。
  3. 以下为代表Set Chunk Size消息的chunk的Data:
    1. image.png
    2. 其中第⼀位必须为0,chunk Size占31个位,最⼤可代表2147483647=0x7FFFFFFF=2 -1,但实际上所有⼤于16777215=0xFFFFFF的值都⽤不上,因为chunk size不能⼤于Message的⻓度,表示Message的⻓度字段是⽤3个字节表示的,最⼤只能为0xFFFFFF。
2. Set Chunk Size消息代码分析
  1. SRS服务器向客户端发送Set Chunk Size消息
srs_error_t SrsRtmpServer::set_chunk_size(int chunk_size)
{
    srs_error_t err = srs_success;
    
    SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
    pkt->chunk_size = chunk_size;
    if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
        return srs_error_wrap(err, "send set chunk size");
    }
    
    return err;
}
  1. FFmpeg客户端处理SRS服务器发送过来的Set Chunk Size消息
static int handle_chunk_size(URLContext *s, RTMPPacket *pkt)
{
    RTMPContext *rt = s->priv_data;
    int ret;

    if (pkt->size < 4) {
        av_log(s, AV_LOG_ERROR,
               "Too short chunk size change packet (%d)\n",
               pkt->size);
        return AVERROR_INVALIDDATA;
    }

    if (!rt->is_input) {
        /* Send the same chunk size change packet back to the server,
         * setting the outgoing chunk size to the same as the incoming one. */
        if ((ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
                                        &rt->prev_pkt[1], &rt->nb_prev_pkt[1])) < 0)
            return ret;
        rt->out_chunk_size = AV_RB32(pkt->data);
    }

    rt->in_chunk_size = AV_RB32(pkt->data);
    if (rt->in_chunk_size <= 0) {
        av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n",
               rt->in_chunk_size);
        return AVERROR_INVALIDDATA;
    }
    av_log(s, AV_LOG_DEBUG, "New incoming chunk size = %d\n",
           rt->in_chunk_size);

    return 0;
}

5. SRS服务器发送response_connect_app消息给客户端

  1. SRS服务器是会发送response_connect_app消息给客户端用来响应客户端发送的connect消息,但这部分wiresharek是没有抓取到此包的。
  2. 通过tcpdump命令查看包信息:
sudo tcpdump -i any port 1935 -XX and dst 36.112.32.2 #不是公网地址
  1. 是有返回result消息的,只是wiresharek没有抓取到此包。

image.png

1. response_connect_app消息代码分析
  1. SRS服务器向客户端发送esponse_connect_app消息
srs_error_t SrsRtmpServer::response_connect_app(SrsRequest *req, const char* server_ip)
{
    srs_error_t err = srs_success;
    
    SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();

    // @remark For windows, there must be a space between const string and macro.
    pkt->props->set("fmsVer", SrsAmf0Any::str("FMS/" RTMP_SIG_FMS_VER));
    pkt->props->set("capabilities", SrsAmf0Any::number(127));
    pkt->props->set("mode", SrsAmf0Any::number(1));
    
    pkt->info->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
    pkt->info->set(StatusCode, SrsAmf0Any::str(StatusCodeConnectSuccess));
    pkt->info->set(StatusDescription, SrsAmf0Any::str("Connection succeeded"));
    pkt->info->set("objectEncoding", SrsAmf0Any::number(req->objectEncoding));
    SrsAmf0EcmaArray* data = SrsAmf0Any::ecma_array();
    pkt->info->set("data", data);
    
    data->set("version", SrsAmf0Any::str(RTMP_SIG_FMS_VER));
    data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));
    data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
    data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));
    data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));
    data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));

    if (server_ip) {
        data->set("srs_server_ip", SrsAmf0Any::str(server_ip));
    }
    // for edge to directly get the id of client.
    data->set("srs_pid", SrsAmf0Any::number(getpid()));
    data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));
    
    if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
        return srs_error_wrap(err, "send connect app response");
    }
    
    return err;
}

6. 客户端发送Set Chunk Size消息给SRS服务器

  1. wiresharek截图:

image.png

  1. 客户端也会发送Set Chunk Size消息给SRS服务器,值为60000。

7. 客户端发送releaseStream、FCPublish、CreateStream和checkbw消息给SRS服务器

  1. 客户端还需要发送 releaseStream , FCPublish,CreateStream和checkbw消息给SRS服务器,具体可以参考ffmpeg的handle_invoke_result函数和gen_check_bw函数。
  2. handle_invoke_result函数部分代码:
static int handle_invoke_result(URLContext *s, RTMPPacket *pkt)
{
    ...
        
    if (!strcmp(tracked_method, "connect")) {
        if (!rt->is_input) { //输出
            if ((ret = gen_release_stream(s, rt)) < 0) //生成releaseStream发送给服务器
                goto fail;

            if ((ret = gen_fcpublish_stream(s, rt)) < 0)//生成FCPublish发送给服务器
                goto fail;
        } else {    //输入
            if ((ret = gen_window_ack_size(s, rt)) < 0)
                goto fail;
        }

        if ((ret = gen_create_stream(s, rt)) < 0)//生成createStream发送给服务器
            goto fail;

        ...
            
    } else if (!strcmp(tracked_method, "createStream")) {
        ...
    }

fail:
    av_free(tracked_method);
    return ret;
}
1. 客户端发送releaseStream消息给srs服务器
  1. 对应的值liveStream是要处理的流,作用是生成’releaseStream’调用并发送到服务器,让服务器为媒体流释放一些通道。
  2. wiresharek截图:

image.png

1. releaseStream消息代码分析
  1. 客户端生成releaseStream消息发送给SRS服务端,代码:
static int gen_release_stream(URLContext *s, RTMPContext *rt)
{
    RTMPPacket pkt;
    uint8_t *p;
    int ret;

    if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
                                     0, 29 + strlen(rt->playpath))) < 0)
        return ret;

    av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
    p = pkt.data;
    ff_amf_write_string(&p, "releaseStream");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_null(&p);
    ff_amf_write_string(&p, rt->playpath);

    return rtmp_send_packet(rt, &pkt, 1);
}
  1. SRS服务端接收到后会解析成对应的packet,然后返回_result消息。如下wiresharek所示:

image.png

  1. SRS服务器解析releaseStream消息代码:
    // SrsProtocol::do_decode_message函数下
	} else if (command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
        *ppacket = packet = new SrsFMLEStartPacket(); // 处理"releaseStream"消息,rtmp推流时是这个包
        return packet->decode(stream);
    }
  1. 调用栈:
    1. 是从SrsRtmpServer::identify_client函数进去的
Breakpoint 3, SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb540, ppacket=0xadb530) at src/protocol/srs_rtmp_stack.cpp:712
712	            *ppacket = packet = new SrsFMLEStartPacket();
(gdb) bt
#0  SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb540, ppacket=0xadb530) at src/protocol/srs_rtmp_stack.cpp:712
#1  0x00000000004655ec in SrsProtocol::decode_message (this=0xac1280, msg=0xadce30, ppacket=0xadb5c8) at src/protocol/srs_rtmp_stack.cpp:430
#2  0x00000000004702bb in SrsRtmpServer::identify_client (this=0xac1250, stream_id=1, type=@0xacab08: SrsRtmpConnUnknown, stream_name="", duration=@0xacac88: -1) at src/protocol/srs_rtmp_stack.cpp:2521
#3  0x00000000004d519a in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:440
#4  0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#5  0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#6  0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#7  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#8  0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#9  0x00000000005bed1a in _st_thread_main () at sched.c:337
#10 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
  1. 根据packet类型返回对应响应,ReleaseStream/PublishStream/FCPublish/FCUnpublish都是用SrsRtmpServer::identify_fmle_publish_client函数返回_result消息。
srs_error_t SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsRtmpConnType& type, string& stream_name)
{
    srs_error_t err = srs_success;
    
    type = SrsRtmpConnFMLEPublish;
    stream_name = req->stream_name;
    
    // releaseStream response
    if (true) {
        SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
        if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
            return srs_error_wrap(err, "send releaseStream response");
        }
    }
    
    return err;
}
2. 客户端发送FCPublish消息给srs服务器
  1. FCPublish消息作用是使服务器为接收媒体流做好准备。
  2. wiresharek截图:

image.png

1. FCPublish消息代码解析
  1. 客户端生成FCPublish消息发送给SRS服务器:
static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
{
    RTMPPacket pkt;
    uint8_t *p;
    int ret;

    if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
                                     0, 25 + strlen(rt->playpath))) < 0)
        return ret;

    av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
    p = pkt.data;
    ff_amf_write_string(&p, "FCPublish");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_null(&p);
    ff_amf_write_string(&p, rt->playpath);

    return rtmp_send_packet(rt, &pkt, 1);
}
  1. SRS服务器解析FCPublish消息:
    } else if (command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
        *ppacket = packet = new SrsFMLEStartPacket(); //第四个接收到的message为"FCPublish"
        return packet->decode(stream);
    }
  1. 调用栈:
    1. FCPublish、createStream、publish、onFCPublish、onStatus都是通过SrsRtmpServer::start_fmle_publish函数进行处理
Breakpoint 4, SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb480, ppacket=0xadb470) at src/protocol/srs_rtmp_stack.cpp:715
715	            *ppacket = packet = new SrsFMLEStartPacket();
(gdb) bt
#0  SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb480, ppacket=0xadb470) at src/protocol/srs_rtmp_stack.cpp:715
#1  0x00000000004655ec in SrsProtocol::decode_message (this=0xac1280, msg=0xadeca0, ppacket=0xadb500) at src/protocol/srs_rtmp_stack.cpp:430
#2  0x000000000047fd40 in SrsProtocol::expect_message<SrsFMLEStartPacket> (this=0xac1280, pmsg=0xadb588, ppacket=0xadb590) at src/protocol/srs_rtmp_stack.hpp:335
#3  0x000000000047dfd9 in SrsRtmpServer::expect_message<SrsFMLEStartPacket> (this=0xac1250, pmsg=0xadb588, ppacket=0xadb590) at src/protocol/srs_rtmp_stack.hpp:776
#4  0x000000000047190c in SrsRtmpServer::start_fmle_publish (this=0xac1250, stream_id=1) at src/protocol/srs_rtmp_stack.cpp:2714
#5  0x00000000004d5e81 in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:530
#6  0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#7  0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#8  0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#9  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#10 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#11 0x00000000005bed1a in _st_thread_main () at sched.c:337
#12 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
  1. SRS服务器发送_result消息给客户端进行响应FCPublish消息,见如上代码:SrsRtmpServer::identify_fmle_publish_client
3. 客户端发送CreateStream消息给srs服务器
  1. wiresharek截图:

image.png

  1. Create Stream:创建传递具体信息的通道,从⽽可以在这个流中传递具体信息,传输信息单元为Chunk。
  2. 当发送完createStream消息之后,解析服务器返回的消息会得到⼀个stream ID,这个ID也就是以后和服务器通信的 message stream ID,⼀般返回的是1,不固定。
1. CreateStream消息代码解析
  1. 客户端生成CreateStream消息发送给SRS服务器:
static int gen_create_stream(URLContext *s, RTMPContext *rt)
{
    RTMPPacket pkt;
    uint8_t *p;
    int ret;

    av_log(s, AV_LOG_DEBUG, "Creating stream...\n");

    if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
                                     0, 25)) < 0)
        return ret;

    p = pkt.data;
    ff_amf_write_string(&p, "createStream");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_null(&p);

    return rtmp_send_packet(rt, &pkt, 1);
}
  1. SRS服务器解析CreateStream消息:
        } else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
            *ppacket = packet = new SrsCreateStreamPacket(); //第五个接收到的message为"createStream"消息
            return packet->decode(stream);
        }
  1. SRS服务器发送_result消息给客户端进行响应CreateStream消息,见如上代码:SrsRtmpServer::identify_fmle_publish_client
4. 客户端发送checkbw消息给srs服务器
  1. 客户端生成检查带宽消息发送给服务器。
  2. wiresharek截图:

image.png

1. checkbw消息代码解析
  1. 客户端生成checkbw消息发送给SRS服务器:
static int gen_check_bw(URLContext *s, RTMPContext *rt)
{
    RTMPPacket pkt;
    uint8_t *p;
    int ret;

    if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
                                     0, 21)) < 0)
        return ret;

    p = pkt.data;
    ff_amf_write_string(&p, "_checkbw");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_null(&p);

    return rtmp_send_packet(rt, &pkt, 1);
}

8. 客户端向SRS服务器发送publish消息以及SRS服务器向客户端发送onFCPublish消息和onStatue消息

1. publish消息解析
  1. 推流客户端使用publish消息向SRS服务器端发布一个命名的流,发布之后,任意客户端都可以通过该名称请求视频、音频和数据。
  2. wiresharek截图:

image.png

1. publish消息代码解析
  1. 客户端生成publish消息并发送到SRS服务器:
static int gen_publish(URLContext *s, RTMPContext *rt)
{
    RTMPPacket pkt;
    uint8_t *p;
    int ret;

    av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);

    if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE,
                                     0, 30 + strlen(rt->playpath))) < 0)
        return ret;

    pkt.extra = rt->stream_id;

    p = pkt.data;
    ff_amf_write_string(&p, "publish");
    ff_amf_write_number(&p, ++rt->nb_invokes);
    ff_amf_write_null(&p);
    ff_amf_write_string(&p, rt->playpath);
    ff_amf_write_string(&p, "live");

    return rtmp_send_packet(rt, &pkt, 1);
}
  1. SRS服务器解析publish消息:
    } else if (command == RTMP_AMF0_COMMAND_PUBLISH) { //message为"publish"
        *ppacket = packet = new SrsPublishPacket();
        return packet->decode(stream);
    }
  1. 调用栈:
(gdb) bt
#0  SrsProtocol::do_decode_message (this=0xac1280, header=..., stream=0xadb480, ppacket=0xadb470) at src/protocol/srs_rtmp_stack.cpp:718
#1  0x00000000004655ec in SrsProtocol::decode_message (this=0xac1280, msg=0xaded60, ppacket=0xadb500) at src/protocol/srs_rtmp_stack.cpp:430
#2  0x00000000004801c0 in SrsProtocol::expect_message<SrsPublishPacket> (this=0xac1280, pmsg=0xadb588, ppacket=0xadb590) at src/protocol/srs_rtmp_stack.hpp:335
#3  0x000000000047e35b in SrsRtmpServer::expect_message<SrsPublishPacket> (this=0xac1250, pmsg=0xadb588, ppacket=0xadb590) at src/protocol/srs_rtmp_stack.hpp:776
#4  0x0000000000471c42 in SrsRtmpServer::start_fmle_publish (this=0xac1250, stream_id=1) at src/protocol/srs_rtmp_stack.cpp:2757
#5  0x00000000004d5e81 in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:530
#6  0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#7  0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#8  0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#9  0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#10 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#11 0x00000000005bed1a in _st_thread_main () at sched.c:337
#12 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
  1. 解析成功后SRS服务器会生成onFCPublish消息返回给客户端。
2. onFCPublish消息解析
  1. wiresharek截图:

image.png

  1. onFCPublish消息是回应publish消息。
1. onFCPublish消息代码解析
  1. SRS服务器生成onFCPublish消息并发送到客户端:
    // publish response onFCPublish(NetStream.Publish.Start)
    if (true) { //客户端发送publish消息,服务端返回onFCPublish消息。
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
        
        pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
        pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));
        
        if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) { //onFCPublish
            return srs_error_wrap(err, "send NetStream.Publish.Start");
        }
    }
3. onStatue消息解析
  1. SRS服务器还会生成onStatus消息向客户端发送,描述的状态内容中code为NetStream.Publish.Start,description为Start publishing,目的就是告诉推流客户端,现在可以推流了。
1. onStatue消息代码解析
  1. SRS服务器生成onStatue消息并发送到客户端:
    // publish response onStatus(NetStream.Publish.Start)
    if (true) { //服务器发送onStatus消息给客户端,wiresharek无法解析
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
        
        pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
        pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));
        pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
        
        if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {
            return srs_error_wrap(err, "send NetStream.Publish.Start");
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/759823.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现

百亿级存储架构&#xff1a; ElasticSearchHBase 海量存储架构与实现 尼恩&#xff1a;百亿级数据存储架构起源 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;经常性的指导小伙伴们改造简历。 经过尼恩的改造之后&#xff0c;很多小伙伴拿到了一线互联网企业如得物、阿…

AI模型的奥运会:谁将在OlympicArena中夺冠?

获取本文论文原文PDF&#xff0c;请在公众号【AI论文解读】留言&#xff1a;论文解读 引言&#xff1a;AI模型的奥林匹克级评测 评估和比较不同AI模型的性能始终是一个核心话题。随着技术的不断进步&#xff0c;这些模型在处理复杂任务的能力上有了显著的提升。为了更精确地衡…

springboot汽车租赁管理系统-计算机毕业设计源码08754

目 录 摘 要 第 1 章 引 言 1.1 选题背景和意义 1.2 国内外研究现状 1.3 论文结构安排 第 2 章 系统的需求分析 2.1 系统可行性分析 2.1.1 技术方面可行性分析 2.1.2 经济方面可行性分析 2.1.3 法律方面可行性分析 2.1.4 操作方面可行性分析 2.2 系统功能需求分析…

有了文章生成器,轻易满足你对文章的需求

写文章对于大多数人来说并不轻松&#xff0c;往往一篇文章写作完成是需要消耗一个人的大量时间与精力的&#xff0c;如果想要写的文章特别好&#xff0c;那么还要再花一点时间去进入后期的修改。就没有什么方法让大家轻易的去完成文章写作吗&#xff1f;答案是有的&#xff0c;…

【TB作品】密码锁,ATMEGA128单片机,Proteus仿真

题目 5 &#xff1a;密码锁 使用单片机实现简易密码锁&#xff0c;通过输入密码&#xff0c;实现门锁的开启&#xff08;控制继电器&#xff09;。 具体要求如下&#xff1a; &#xff08;1&#xff09;当输入正确密码后&#xff0c;继电器开启。 &#xff08;2&#xff09;当三…

一元线性回归-R语言

# # 安装包 # install.packages(ggplot2) # library(ggplot2) Sys.setlocale(category LC_ALL, locale English_United States.1252) # Sys.setlocale("LC_ALL","Chinese") x <- c(18, 20, 22, 24, 26, 28, 30) y <- c(26.86, 28.35, 28.87,28.75,…

python格式文件

python小白考后复习 CSV格式文件ini格式文件我们可以读取所有节点还可以输出一个节点下所有键值对组成的元组获取节点下的键对应的值判断节点是否存在添加节点还可以添加键值还可以删除节点 XML格式文件读取若是文件格式存在的xml若是以字符串形式存在的xml获取子标签还有获取子…

Arduino IDE 的安装与esp32项目的创建

1打开官网下载 官网 1-1下载完成后安装即可&#xff0c;会弹出一些按安装提示点击安装 2切换为中文模式 2-1点击Flie&#xff0c;在点击图中高亮的位置&#xff0c;进入 2-2选择语言 3创建esp32项目 3-1在线安装&#xff08;不一定成功&#xff0c;可以一直试&#xff09; …

各维度卷积神经网络内容收录

各维度卷积神经网络内容收录 卷积神经网络&#xff08;CNN&#xff09;&#xff0c;通常是指用于图像分类的2D CNN。但是&#xff0c;现实世界中还使用了其他两种类型的卷积神经网络&#xff0c;即1D CNN和3D CNN。 在1D CNN中&#xff0c;内核沿1个方向移动。1D CNN的输入和…

100张linux C/C++工程师面试高质量图

文章目录 杂项BIOSlinux开机启动流程内核启动流程网络编程网络编程流程tcp状态机三次握手四次断开reactor模型proactor模型select原理poll原理epoll原理文件系统虚拟文件系统文件系统调用阻塞IO非阻塞IO异步IO同步阻塞同步非阻塞IO多路复用进程管理进程状态程序加载内存管理MMU…

ArtTS系统能力-通知的学习(3.1)

上篇回顾&#xff1a; ArtTS语言基础类库-容器类库内容的学习(2.10.2&#xff09; 本篇内容&#xff1a; ArtTS系统能力-通知的学习&#xff08;3.1&#xff09; 一、 知识储备 1. 基础类型通知 按内容分成四类&#xff1a; 类型描述NOTIFICATION_CONTENT_BASIC_TEXT普通文…

基于STM32的智能农业环境监控系统

目录 引言环境准备智能农业环境监控系统基础代码实现&#xff1a;实现智能农业环境监控系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;农业环境管理与优化问题解决方案与优化收尾与总结 1. 引言 智能农业环境监控系…

Linux rpm与yum

一、rpm包管理 rpm用于互联网下载包的打包及安装工具&#xff0c;它包含在某些Linux分发版中。它生成具有.RPM扩展名的文件。RPM是RedHat Package Manager (RedHat软件包管理工具&#xff09;的缩写&#xff0c;类似windows的setup.exe&#xff0c;这一文件格式名称虽然打上了R…

技术打包 催化剂浸渍制作方法设备

网盘 https://pan.baidu.com/s/1Bybbyy5qEA2uTUlaELmWwg?pwdepdk 改性加氢处理催化剂载体、催化剂及其制备方法和应用.pdf 水滑石基催化剂在高浓度糖转化到1,2-丙二醇中的应用.pdf 海泡石负载铁锰双金属催化剂及其制备方法和应用.pdf 甘油氢解催化剂及其制备方法和应用.pdf 用…

LeetCode-Leetcode 1120:子树的最大平均值

LeetCode-Leetcode 1120&#xff1a;子树的最大平均值 题目描述&#xff1a;解题思路一&#xff1a;递归解题思路二&#xff1a;0解题思路三&#xff1a;0 题目描述&#xff1a; 给你一棵二叉树的根节点 root&#xff0c;找出这棵树的 每一棵 子树的 平均值 中的 最大 值。 子…

Redis 7.x 系列【10】数据类型之有序集合(ZSet)

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 常用命令2.1 ZADD2.2 ZCARD2.3 ZSCORE2.4 ZRANGE2.5 ZREVRANGE2.6 ZRANK2.7…

ssm网上旅游信息管理系统-计算机毕业设计源码06975

目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3论文结构与章节安排 2 系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据新增流程 2.2.2 数据删除流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分析 2.5本章小结 3 系统总体设…

【课程总结】Day13(上):使用YOLO进行目标检测

前言 在上一章《【课程总结】Day11&#xff08;下&#xff09;&#xff1a;YOLO的入门使用》的学习中&#xff0c;我们通过YOLO实现了对图片的分类任务。本章的学习内容&#xff0c;将以目标检测为切入口&#xff0c;了解目标检测流程&#xff0c;包括&#xff1a;数据标准、模…

Spring Boot集成jasypt快速入门Demo

1.什么是Jasypt&#xff1f; Jasypt&#xff08;Java Simplified Encryption&#xff09;是一个专注于简化Java加密操作的工具。 它提供了一种简单而强大的方式来处理数据的加密和解密&#xff0c;使开发者能够轻松地保护应用程序中的敏感信息&#xff0c;如数据库密码、API密…

使用NFS网关功能将HDFS挂载到本地系统

HDFS安装教程 HDFS安装教程http://t.csdnimg.cn/2ziFd 使用NFS网关功能将HDFS挂载到本地系统 简介 HDFS提供了基于NFS&#xff08;Network File System&#xff09;的插件&#xff0c;可以对外提供NFS网关&#xff0c;供其它系统挂载使用。 NFS 网关支持 NFSv3&#xff0c;并…