1、输入参数
static GOptionEntry entries[] = {
{"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id, "String ID of the peer to connect to", "ID"},
{"server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL"},
{"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL},
{"remote-offerer", 0, 0, G_OPTION_ARG_NONE, &remote_is_offerer, "Request that the peer generate the offer and we'll answer", NULL},
{NULL},
};
2、解析参数并连接到信令服务器
int main(int argc, char *argv[])
{
GOptionContext *context;
GError *error = NULL;
sigint_setup();
context = g_option_context_new("- gstreamer webrtc sendrecv demo");
g_option_context_add_main_entries(context, entries, NULL);
g_option_context_add_group(context, gst_init_get_option_group());
if (!g_option_context_parse(context, &argc, &argv, &error)){
g_printerr("Error initializing: %s\n", error->message);
return -1;
}
if (!check_plugins())
return -1;
if (!peer_id){
g_printerr("--peer-id is a required argument\n");
return -1;
}
//运行 localhost 服务器时禁用 ssl,因为它可能是具有自签名证书的测试服务器
{
GstUri *uri = gst_uri_from_string(server_url);
if (g_strcmp0("localhost", gst_uri_get_host(uri)) == 0 ||
g_strcmp0("127.0.0.1", gst_uri_get_host(uri)) == 0)
disable_ssl = TRUE;
gst_uri_unref(uri);
}
loop = g_main_loop_new(NULL, FALSE);
connect_to_websocket_server_async();
g_main_loop_run(loop);
g_main_loop_unref(loop);
if (pipe1) {
gst_element_set_state(GST_ELEMENT(pipe1), GST_STATE_NULL);
g_print("Pipeline stopped\n");
gst_object_unref(pipe1);
}
return 0;
}
3、连接到
wss://webrtc.nirbheek.in:8443信令服务器
/* 连接到信令服务器。这是其他一切的入口点 */
static const gchar *server_url = "wss://webrtc.nirbheek.in:8443";
static void
connect_to_websocket_server_async(void)
{
SoupLogger *logger;
SoupMessage *message;
SoupSession *session;
const char *https_aliases[] = {"wss", NULL};
session = soup_session_new_with_options(SOUP_SESSION_SSL_STRICT, !disable_ssl,
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
//SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
logger = soup_logger_new(SOUP_LOGGER_LOG_BODY, -1);
soup_session_add_feature(session, SOUP_SESSION_FEATURE(logger));
g_object_unref(logger);
message = soup_message_new(SOUP_METHOD_GET, server_url);
g_print("Connecting to server...\n");
/* Once connected, we will register */
soup_session_websocket_connect_async(session, message, NULL, NULL, NULL, (GAsyncReadyCallback)on_server_connected, message);
app_state = SERVER_CONNECTING;
}
4、连接成功后监听消息和关闭事件并注册服务器
static void
on_server_connected(SoupSession *session, GAsyncResult *res,
SoupMessage *msg)
{
GError *error = NULL;
ws_conn = soup_session_websocket_connect_finish(session, res, &error);
if (error)
{
cleanup_and_quit_loop(error->message, SERVER_CONNECTION_ERROR);
g_error_free(error);
return;
}
g_assert_nonnull(ws_conn);
app_state = SERVER_CONNECTED;
g_print("Connected to signalling server\n");
g_signal_connect(ws_conn, "closed", G_CALLBACK(on_server_closed), NULL);
g_signal_connect(ws_conn, "message", G_CALLBACK(on_server_message), NULL);
/* Register with the server so it knows about us and can accept commands */
register_with_server();
}
5、向信令服务器注册
static gboolean
register_with_server(void)
{
gchar *hello;
gint32 our_id;
if (soup_websocket_connection_get_state(ws_conn) !=
SOUP_WEBSOCKET_STATE_OPEN)
return FALSE;
our_id = g_random_int_range(10, 10000);
g_print("Registering id %i with server\n", our_id);
app_state = SERVER_REGISTERING;
// 使用随机整数 ID 向服务器注册。回复将通过 on_server_message() 接收
hello = g_strdup_printf("HELLO %i", our_id);
soup_websocket_connection_send_text(ws_conn, hello);
g_free(hello);
return TRUE;
}
6、告诉信令服务器连接对端peer_id
static gboolean
setup_call(void)
{
gchar *msg;
if (soup_websocket_connection_get_state(ws_conn) != SOUP_WEBSOCKET_STATE_OPEN)
return FALSE;
if (!peer_id)
return FALSE;
g_print("Setting up signalling server call with %s\n", peer_id);
app_state = PEER_CONNECTING;
msg = g_strdup_printf("SESSION %s", peer_id);
soup_websocket_connection_send_text(ws_conn, msg);
g_free(msg);
return TRUE;
}
7、处理回复消息
/* One mega message handler for our asynchronous calling mechanism */
static void
on_server_message(SoupWebsocketConnection *conn, SoupWebsocketDataType type,
GBytes *message, gpointer user_data)
{
// 检测数据是否为文本类型
gchar *text;
switch (type) {
case SOUP_WEBSOCKET_DATA_BINARY:
g_printerr("Received unknown binary message, ignoring\n");
return;
case SOUP_WEBSOCKET_DATA_TEXT: {
gsize size;
const gchar *data = g_bytes_get_data(message, &size);
/* Convert to NULL-terminated string */
text = g_strndup(data, size);
break;
}
default:
g_assert_not_reached();
}
// 服务器已接受我们的注册,我们已准备好发送命令
if (g_strcmp0(text, "HELLO") == 0) {
if (app_state != SERVER_REGISTERING) {
cleanup_and_quit_loop("ERROR: Received HELLO when not registering", APP_STATE_ERROR);
goto out;
}
app_state = SERVER_REGISTERED;
g_print("Registered with server\n");
// 要求信令服务器将我们与特定对等点连接起来
if (!setup_call()) {
cleanup_and_quit_loop("ERROR: Failed to setup call", PEER_CALL_ERROR);
goto out;
}
/* Call has been setup by the server, now we can start negotiation */
} else if (g_strcmp0(text, "SESSION_OK") == 0) {
if (app_state != PEER_CONNECTING) {
cleanup_and_quit_loop("ERROR: Received SESSION_OK when not calling", PEER_CONNECTION_ERROR);
goto out;
}
app_state = PEER_CONNECTED;
// 开始谈判(交换 SDP 和 ICE 候选人)
if (!start_pipeline())
cleanup_and_quit_loop("ERROR: failed to start pipeline", PEER_CALL_ERROR);
} else if (g_str_has_prefix(text, "ERROR")) { //处理错误消息
switch (app_state) {
case SERVER_CONNECTING:
app_state = SERVER_CONNECTION_ERROR;
break;
case SERVER_REGISTERING:
app_state = SERVER_REGISTRATION_ERROR;
break;
case PEER_CONNECTING:
app_state = PEER_CONNECTION_ERROR;
break;
case PEER_CONNECTED:
case PEER_CALL_NEGOTIATING:
app_state = PEER_CALL_ERROR;
break;
default:
app_state = APP_STATE_ERROR;
}
cleanup_and_quit_loop(text, 0);
/* Look for JSON messages containing SDP and ICE candidates */
} else {
JsonNode *root;
JsonObject *object, *child;
JsonParser *parser = json_parser_new();
if (!json_parser_load_from_data(parser, text, -1, NULL)){
g_printerr("Unknown message '%s', ignoring", text);
g_object_unref(parser);
goto out;
}
root = json_parser_get_root(parser);
if (!JSON_NODE_HOLDS_OBJECT(root)){
g_printerr("Unknown json message '%s', ignoring", text);
g_object_unref(parser);
goto out;
}
object = json_node_get_object(root);
if (json_object_has_member(object, "sdp")) { //远端的sdp消息
int ret;
GstSDPMessage *sdp;
const gchar *text, *sdptype;
GstWebRTCSessionDescription *answer;
g_assert_cmphex(app_state, ==, PEER_CALL_NEGOTIATING);
child = json_object_get_object_member(object, "sdp");
if (!json_object_has_member(child, "type")) {
cleanup_and_quit_loop("ERROR: received SDP without 'type'", PEER_CALL_ERROR);
goto out;
}
sdptype = json_object_get_string_member(child, "type");
// 在这个例子中,我们默认创建了要约并收到一个答案,但可以注释掉要约创建并等待要约,因此我们在这里处理。
// 请参阅 gst-plugins-bad 中的 tests/examples/webrtcbidirectional.c 以获取另一个示例,了解如何处理来自同行的要约并使用 webrtcbin 回复答案。
text = json_object_get_string_member(child, "sdp");
ret = gst_sdp_message_new(&sdp);
g_assert_cmphex(ret, ==, GST_SDP_OK);
ret = gst_sdp_message_parse_buffer((guint8 *)text, strlen(text), sdp);
g_assert_cmphex(ret, ==, GST_SDP_OK);
if (g_str_equal(sdptype, "answer")){ // answer sdp消息
g_print("Received answer:\n%s\n", text);
answer = gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER, sdp);
g_assert_nonnull(answer);
// 设置远端的sdp到pipeline
{
GstPromise *promise = gst_promise_new();
g_signal_emit_by_name(webrtc1, "set-remote-description", answer, promise);
gst_promise_interrupt(promise);
gst_promise_unref(promise);
}
app_state = PEER_CALL_STARTED;
} else { // offer sdp消息
g_print("Received offer:\n%s\n", text);
on_offer_received(sdp);
}
} else if (json_object_has_member(object, "ice")) { // ICE候选人信息
const gchar *candidate;
gint sdpmlineindex;
child = json_object_get_object_member(object, "ice");
candidate = json_object_get_string_member(child, "candidate");
sdpmlineindex = json_object_get_int_member(child, "sdpMLineIndex");
/* Add ice candidate sent by remote peer */
g_signal_emit_by_name(webrtc1, "add-ice-candidate", sdpmlineindex, candidate);
} else {
g_printerr("Ignoring unknown JSON message:\n%s\n", text);
}
g_object_unref(parser);
}
out:
g_free(text);
}
8、创建webrtcbin的Pipeline
#define STUN_SERVER " stun-server=stun://stun.l.google.com:19302 "
#define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload="
#define RTP_CAPS_VP8 "application/x-rtp,media=video,encoding-name=VP8,payload="
static gboolean
start_pipeline(void)
{
GstStateChangeReturn ret;
GError *error = NULL;
pipe1 = gst_parse_launch("webrtcbin bundle-policy=max-bundle name=sendrecv " STUN_SERVER
"videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! "
"queue ! " RTP_CAPS_VP8 "96 ! sendrecv. " //发送视频
"audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! "
"queue ! " RTP_CAPS_OPUS "97 ! sendrecv. ", //发送音频
&error);
if (error) {
g_printerr("Failed to parse launch: %s\n", error->message);
g_error_free(error);
goto err;
}
// 获取webrtcbin插件对象
webrtc1 = gst_bin_get_by_name(GST_BIN(pipe1), "sendrecv");
g_assert_nonnull(webrtc1);
// 这是我们create offer sdp的 gstwebrtc 入口点。当管道进入播放状态时,它将被调用。
g_signal_connect(webrtc1, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed), NULL);
// 我们需要通过 websockets 信令服务器将此 ICE 候选传输到浏览器。来自浏览器的传入 ice 候选也需要我们添加,请参阅 on_server_message()
g_signal_connect(webrtc1, "on-ice-candidate", G_CALLBACK(send_ice_candidate_message), NULL);
g_signal_connect(webrtc1, "notify::ice-gathering-state", G_CALLBACK(on_ice_gathering_state_notify), NULL);
gst_element_set_state(pipe1, GST_STATE_READY);
g_signal_emit_by_name(webrtc1, "create-data-channel", "channel", NULL, &send_channel);
if (send_channel) {
g_print("Created data channel\n");
connect_data_channel_signals(send_channel);
} else {
g_print("Could not create data channel, is usrsctp available?\n");
}
g_signal_connect(webrtc1, "on-data-channel", G_CALLBACK(on_data_channel), NULL);
// 接收到音视频数据on_incoming_stream回调函数处理
g_signal_connect(webrtc1, "pad-added", G_CALLBACK(on_incoming_stream), pipe1);
// 生命周期与管道本身相同, 启动管道
gst_object_unref(webrtc1);
g_print("Starting pipeline\n");
ret = gst_element_set_state(GST_ELEMENT(pipe1), GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
goto err;
return TRUE;
err:
if (pipe1)
g_clear_object(&pipe1);
if (webrtc1)
webrtc1 = NULL;
return FALSE;
}
9、data-channel数据通道数据通信
static void
connect_data_channel_signals(GObject *data_channel)
{
g_signal_connect(data_channel, "on-error", G_CALLBACK(data_channel_on_error), NULL);
g_signal_connect(data_channel, "on-open", G_CALLBACK(data_channel_on_open), NULL);
g_signal_connect(data_channel, "on-close", G_CALLBACK(data_channel_on_close), NULL);
g_signal_connect(data_channel, "on-message-string", G_CALLBACK(data_channel_on_message_string), NULL);
}
10、协商处理
static void
on_negotiation_needed(GstElement *element, gpointer user_data)
{
app_state = PEER_CALL_NEGOTIATING;
// 是否接收远端的sdp,音视频流
if (remote_is_offerer) {
//发送指令获取sdp
gchar *msg = g_strdup_printf("OFFER_REQUEST");
soup_websocket_connection_send_text(ws_conn, msg);
g_free(msg);
} else {
//只发送模式
GArray *transceivers;
g_signal_emit_by_name(element, "get-transceivers", &transceivers);
GstWebRTCRTPTransceiver *transceiver = g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
transceiver->direction = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY;
g_object_set(transceiver, "fec-type", GST_WEBRTC_FEC_TYPE_ULP_RED, NULL);
g_object_set(transceiver, "do-nack", TRUE, NULL);
//创建本地offer sdp,获取本地音视频信息
GstPromise *promise;
promise = gst_promise_new_with_change_func(on_offer_created, user_data, NULL);
g_signal_emit_by_name(webrtc1, "create-offer", NULL, promise);
}
}
11、获取offer sdp后设置本地并且发送给对端
/* Offer created by our pipeline, to be sent to the peer */
static void
on_offer_created(GstPromise *promise, gpointer user_data)
{
GstWebRTCSessionDescription *offer = NULL;
const GstStructure *reply;
g_assert_cmphex(app_state, ==, PEER_CALL_NEGOTIATING);
g_assert_cmphex(gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED);
reply = gst_promise_get_reply(promise);
gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
gst_promise_unref(promise);
promise = gst_promise_new();
g_signal_emit_by_name(webrtc1, "set-local-description", offer, promise);
gst_promise_interrupt(promise);
gst_promise_unref(promise);
/* Send offer to peer */
send_sdp_to_peer(offer);
gst_webrtc_session_description_free(offer);
}
12、接收音视频数据并解码后调用
on_incoming_decodebin_stream
static void
on_incoming_stream(GstElement *webrtc, GstPad *pad, GstElement *pipe)
{
GstElement *decodebin;
GstPad *sinkpad;
if (GST_PAD_DIRECTION(pad) != GST_PAD_SRC)
return;
decodebin = gst_element_factory_make("decodebin", NULL);
g_signal_connect(decodebin, "pad-added", G_CALLBACK(on_incoming_decodebin_stream), pipe);
gst_bin_add(GST_BIN(pipe), decodebin);
gst_element_sync_state_with_parent(decodebin);
sinkpad = gst_element_get_static_pad(decodebin, "sink");
gst_pad_link(pad, sinkpad);
gst_object_unref(sinkpad);
}
13、接收解码后的音视频数据并播放
static void
on_incoming_decodebin_stream(GstElement *decodebin, GstPad *pad,
GstElement *pipe)
{
GstCaps *caps;
const gchar *name;
if (!gst_pad_has_current_caps(pad)) {
g_printerr("Pad '%s' has no caps, can't do anything, ignoring\n", GST_PAD_NAME(pad));
return;
}
caps = gst_pad_get_current_caps(pad);
name = gst_structure_get_name(gst_caps_get_structure(caps, 0));
if (g_str_has_prefix(name, "video")){
handle_media_stream(pad, pipe, "videoconvert", "autovideosink");
} else if (g_str_has_prefix(name, "audio")) {
handle_media_stream(pad, pipe, "audioconvert", "autoaudiosink");
} else {
g_printerr("Unknown pad %s, ignoring", GST_PAD_NAME(pad));
}
}