webrtc-sendrecv: Fix create-answer caps negotiation
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc-sendrecv.c
1 /*
2  * Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
3  * with a browser JS app.
4  *
5  * Build by running: `make webrtc-sendrecv`, or build the gstreamer monorepo.
6  *
7  * Author: Nirbheek Chauhan <nirbheek@centricular.com>
8  */
9 #include <gst/gst.h>
10 #include <gst/sdp/sdp.h>
11 #include <gst/rtp/rtp.h>
12
13 #define GST_USE_UNSTABLE_API
14 #include <gst/webrtc/webrtc.h>
15
16 /* For signalling */
17 #include <libsoup/soup.h>
18 #include <json-glib/json-glib.h>
19
20 #include <string.h>
21
22 enum AppState
23 {
24   APP_STATE_UNKNOWN = 0,
25   APP_STATE_ERROR = 1,          /* generic error */
26   SERVER_CONNECTING = 1000,
27   SERVER_CONNECTION_ERROR,
28   SERVER_CONNECTED,             /* Ready to register */
29   SERVER_REGISTERING = 2000,
30   SERVER_REGISTRATION_ERROR,
31   SERVER_REGISTERED,            /* Ready to call a peer */
32   SERVER_CLOSED,                /* server connection closed by us or the server */
33   PEER_CONNECTING = 3000,
34   PEER_CONNECTION_ERROR,
35   PEER_CONNECTED,
36   PEER_CALL_NEGOTIATING = 4000,
37   PEER_CALL_STARTED,
38   PEER_CALL_STOPPING,
39   PEER_CALL_STOPPED,
40   PEER_CALL_ERROR,
41 };
42
43 #define GST_CAT_DEFAULT webrtc_sendrecv_debug
44 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
45
46 static GMainLoop *loop;
47 static GstElement *pipe1, *webrtc1 = NULL;
48 static GObject *send_channel, *receive_channel;
49
50 static SoupWebsocketConnection *ws_conn = NULL;
51 static enum AppState app_state = 0;
52 static gchar *peer_id = NULL;
53 static gchar *our_id = NULL;
54 static const gchar *server_url = "wss://webrtc.nirbheek.in:8443";
55 static gboolean disable_ssl = FALSE;
56 static gboolean remote_is_offerer = FALSE;
57
58 static GOptionEntry entries[] = {
59   {"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id,
60       "String ID of the peer to connect to", "ID"},
61   {"our-id", 0, 0, G_OPTION_ARG_STRING, &our_id,
62       "String ID that the peer can use to connect to us", "ID"},
63   {"server", 0, 0, G_OPTION_ARG_STRING, &server_url,
64       "Signalling server to connect to", "URL"},
65   {"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL},
66   {"remote-offerer", 0, 0, G_OPTION_ARG_NONE, &remote_is_offerer,
67       "Request that the peer generate the offer and we'll answer", NULL},
68   {NULL},
69 };
70
71 static gboolean
72 cleanup_and_quit_loop (const gchar * msg, enum AppState state)
73 {
74   if (msg)
75     gst_printerr ("%s\n", msg);
76   if (state > 0)
77     app_state = state;
78
79   if (ws_conn) {
80     if (soup_websocket_connection_get_state (ws_conn) ==
81         SOUP_WEBSOCKET_STATE_OPEN)
82       /* This will call us again */
83       soup_websocket_connection_close (ws_conn, 1000, "");
84     else
85       g_clear_object (&ws_conn);
86   }
87
88   if (loop) {
89     g_main_loop_quit (loop);
90     g_clear_pointer (&loop, g_main_loop_unref);
91   }
92
93   /* To allow usage as a GSourceFunc */
94   return G_SOURCE_REMOVE;
95 }
96
97 static gchar *
98 get_string_from_json_object (JsonObject * object)
99 {
100   JsonNode *root;
101   JsonGenerator *generator;
102   gchar *text;
103
104   /* Make it the root node */
105   root = json_node_init_object (json_node_alloc (), object);
106   generator = json_generator_new ();
107   json_generator_set_root (generator, root);
108   text = json_generator_to_data (generator, NULL);
109
110   /* Release everything */
111   g_object_unref (generator);
112   json_node_free (root);
113   return text;
114 }
115
116 static void
117 handle_media_stream (GstPad * pad, GstElement * pipe, const char *convert_name,
118     const char *sink_name)
119 {
120   GstPad *qpad;
121   GstElement *q, *conv, *resample, *sink;
122   GstPadLinkReturn ret;
123
124   gst_println ("Trying to handle stream with %s ! %s", convert_name, sink_name);
125
126   q = gst_element_factory_make ("queue", NULL);
127   g_assert_nonnull (q);
128   conv = gst_element_factory_make (convert_name, NULL);
129   g_assert_nonnull (conv);
130   sink = gst_element_factory_make (sink_name, NULL);
131   g_assert_nonnull (sink);
132
133   if (g_strcmp0 (convert_name, "audioconvert") == 0) {
134     /* Might also need to resample, so add it just in case.
135      * Will be a no-op if it's not required. */
136     resample = gst_element_factory_make ("audioresample", NULL);
137     g_assert_nonnull (resample);
138     gst_bin_add_many (GST_BIN (pipe), q, conv, resample, sink, NULL);
139     gst_element_sync_state_with_parent (q);
140     gst_element_sync_state_with_parent (conv);
141     gst_element_sync_state_with_parent (resample);
142     gst_element_sync_state_with_parent (sink);
143     gst_element_link_many (q, conv, resample, sink, NULL);
144   } else {
145     gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL);
146     gst_element_sync_state_with_parent (q);
147     gst_element_sync_state_with_parent (conv);
148     gst_element_sync_state_with_parent (sink);
149     gst_element_link_many (q, conv, sink, NULL);
150   }
151
152   qpad = gst_element_get_static_pad (q, "sink");
153
154   ret = gst_pad_link (pad, qpad);
155   g_assert_cmphex (ret, ==, GST_PAD_LINK_OK);
156 }
157
158 static void
159 on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad,
160     GstElement * pipe)
161 {
162   GstCaps *caps;
163   const gchar *name;
164
165   if (!gst_pad_has_current_caps (pad)) {
166     gst_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n",
167         GST_PAD_NAME (pad));
168     return;
169   }
170
171   caps = gst_pad_get_current_caps (pad);
172   name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
173
174   if (g_str_has_prefix (name, "video")) {
175     handle_media_stream (pad, pipe, "videoconvert", "autovideosink");
176   } else if (g_str_has_prefix (name, "audio")) {
177     handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink");
178   } else {
179     gst_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad));
180   }
181 }
182
183 static void
184 on_incoming_stream (GstElement * webrtc, GstPad * pad, GstElement * pipe)
185 {
186   GstElement *decodebin;
187   GstPad *sinkpad;
188
189   if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
190     return;
191
192   decodebin = gst_element_factory_make ("decodebin", NULL);
193   g_signal_connect (decodebin, "pad-added",
194       G_CALLBACK (on_incoming_decodebin_stream), pipe);
195   gst_bin_add (GST_BIN (pipe), decodebin);
196   gst_element_sync_state_with_parent (decodebin);
197
198   sinkpad = gst_element_get_static_pad (decodebin, "sink");
199   gst_pad_link (pad, sinkpad);
200   gst_object_unref (sinkpad);
201 }
202
203 static void
204 send_ice_candidate_message (GstElement * webrtc G_GNUC_UNUSED, guint mlineindex,
205     gchar * candidate, gpointer user_data G_GNUC_UNUSED)
206 {
207   gchar *text;
208   JsonObject *ice, *msg;
209
210   if (app_state < PEER_CALL_NEGOTIATING) {
211     cleanup_and_quit_loop ("Can't send ICE, not in call", APP_STATE_ERROR);
212     return;
213   }
214
215   ice = json_object_new ();
216   json_object_set_string_member (ice, "candidate", candidate);
217   json_object_set_int_member (ice, "sdpMLineIndex", mlineindex);
218   msg = json_object_new ();
219   json_object_set_object_member (msg, "ice", ice);
220   text = get_string_from_json_object (msg);
221   json_object_unref (msg);
222
223   soup_websocket_connection_send_text (ws_conn, text);
224   g_free (text);
225 }
226
227 static void
228 send_sdp_to_peer (GstWebRTCSessionDescription * desc)
229 {
230   gchar *text;
231   JsonObject *msg, *sdp;
232
233   if (app_state < PEER_CALL_NEGOTIATING) {
234     cleanup_and_quit_loop ("Can't send SDP to peer, not in call",
235         APP_STATE_ERROR);
236     return;
237   }
238
239   text = gst_sdp_message_as_text (desc->sdp);
240   sdp = json_object_new ();
241
242   if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER) {
243     gst_print ("Sending offer:\n%s\n", text);
244     json_object_set_string_member (sdp, "type", "offer");
245   } else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER) {
246     gst_print ("Sending answer:\n%s\n", text);
247     json_object_set_string_member (sdp, "type", "answer");
248   } else {
249     g_assert_not_reached ();
250   }
251
252   json_object_set_string_member (sdp, "sdp", text);
253   g_free (text);
254
255   msg = json_object_new ();
256   json_object_set_object_member (msg, "sdp", sdp);
257   text = get_string_from_json_object (msg);
258   json_object_unref (msg);
259
260   soup_websocket_connection_send_text (ws_conn, text);
261   g_free (text);
262 }
263
264 /* Offer created by our pipeline, to be sent to the peer */
265 static void
266 on_offer_created (GstPromise * promise, gpointer user_data)
267 {
268   GstWebRTCSessionDescription *offer = NULL;
269   const GstStructure *reply;
270
271   g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
272
273   g_assert_cmphex (gst_promise_wait (promise), ==, GST_PROMISE_RESULT_REPLIED);
274   reply = gst_promise_get_reply (promise);
275   gst_structure_get (reply, "offer",
276       GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
277   gst_promise_unref (promise);
278
279   promise = gst_promise_new ();
280   g_signal_emit_by_name (webrtc1, "set-local-description", offer, promise);
281   gst_promise_interrupt (promise);
282   gst_promise_unref (promise);
283
284   /* Send offer to peer */
285   send_sdp_to_peer (offer);
286   gst_webrtc_session_description_free (offer);
287 }
288
289 static void
290 on_negotiation_needed (GstElement * element, gpointer user_data)
291 {
292   gboolean create_offer = GPOINTER_TO_INT (user_data);
293   app_state = PEER_CALL_NEGOTIATING;
294
295   if (remote_is_offerer) {
296     soup_websocket_connection_send_text (ws_conn, "OFFER_REQUEST");
297   } else if (create_offer) {
298     GstPromise *promise =
299         gst_promise_new_with_change_func (on_offer_created, NULL, NULL);
300     g_signal_emit_by_name (webrtc1, "create-offer", NULL, promise);
301   }
302 }
303
304 static void
305 data_channel_on_error (GObject * dc, gpointer user_data)
306 {
307   cleanup_and_quit_loop ("Data channel error", 0);
308 }
309
310 static void
311 data_channel_on_open (GObject * dc, gpointer user_data)
312 {
313   GBytes *bytes = g_bytes_new ("data", strlen ("data"));
314   gst_print ("data channel opened\n");
315   g_signal_emit_by_name (dc, "send-string", "Hi! from GStreamer");
316   g_signal_emit_by_name (dc, "send-data", bytes);
317   g_bytes_unref (bytes);
318 }
319
320 static void
321 data_channel_on_close (GObject * dc, gpointer user_data)
322 {
323   cleanup_and_quit_loop ("Data channel closed", 0);
324 }
325
326 static void
327 data_channel_on_message_string (GObject * dc, gchar * str, gpointer user_data)
328 {
329   gst_print ("Received data channel message: %s\n", str);
330 }
331
332 static void
333 connect_data_channel_signals (GObject * data_channel)
334 {
335   g_signal_connect (data_channel, "on-error",
336       G_CALLBACK (data_channel_on_error), NULL);
337   g_signal_connect (data_channel, "on-open", G_CALLBACK (data_channel_on_open),
338       NULL);
339   g_signal_connect (data_channel, "on-close",
340       G_CALLBACK (data_channel_on_close), NULL);
341   g_signal_connect (data_channel, "on-message-string",
342       G_CALLBACK (data_channel_on_message_string), NULL);
343 }
344
345 static void
346 on_data_channel (GstElement * webrtc, GObject * data_channel,
347     gpointer user_data)
348 {
349   connect_data_channel_signals (data_channel);
350   receive_channel = data_channel;
351 }
352
353 static void
354 on_ice_gathering_state_notify (GstElement * webrtcbin, GParamSpec * pspec,
355     gpointer user_data)
356 {
357   GstWebRTCICEGatheringState ice_gather_state;
358   const gchar *new_state = "unknown";
359
360   g_object_get (webrtcbin, "ice-gathering-state", &ice_gather_state, NULL);
361   switch (ice_gather_state) {
362     case GST_WEBRTC_ICE_GATHERING_STATE_NEW:
363       new_state = "new";
364       break;
365     case GST_WEBRTC_ICE_GATHERING_STATE_GATHERING:
366       new_state = "gathering";
367       break;
368     case GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE:
369       new_state = "complete";
370       break;
371   }
372   gst_print ("ICE gathering state changed to %s\n", new_state);
373 }
374
375 static gboolean webrtcbin_get_stats (GstElement * webrtcbin);
376
377 static gboolean
378 on_webrtcbin_stat (GQuark field_id, const GValue * value, gpointer unused)
379 {
380   if (GST_VALUE_HOLDS_STRUCTURE (value)) {
381     GST_DEBUG ("stat: \'%s\': %" GST_PTR_FORMAT, g_quark_to_string (field_id),
382         gst_value_get_structure (value));
383   } else {
384     GST_FIXME ("unknown field \'%s\' value type: \'%s\'",
385         g_quark_to_string (field_id), g_type_name (G_VALUE_TYPE (value)));
386   }
387
388   return TRUE;
389 }
390
391 static void
392 on_webrtcbin_get_stats (GstPromise * promise, GstElement * webrtcbin)
393 {
394   const GstStructure *stats;
395
396   g_return_if_fail (gst_promise_wait (promise) == GST_PROMISE_RESULT_REPLIED);
397
398   stats = gst_promise_get_reply (promise);
399   gst_structure_foreach (stats, on_webrtcbin_stat, NULL);
400
401   g_timeout_add (100, (GSourceFunc) webrtcbin_get_stats, webrtcbin);
402 }
403
404 static gboolean
405 webrtcbin_get_stats (GstElement * webrtcbin)
406 {
407   GstPromise *promise;
408
409   promise =
410       gst_promise_new_with_change_func (
411       (GstPromiseChangeFunc) on_webrtcbin_get_stats, webrtcbin, NULL);
412
413   GST_TRACE ("emitting get-stats on %" GST_PTR_FORMAT, webrtcbin);
414   g_signal_emit_by_name (webrtcbin, "get-stats", NULL, promise);
415   gst_promise_unref (promise);
416
417   return G_SOURCE_REMOVE;
418 }
419
420
421 #define STUN_SERVER " stun-server=stun://stun.l.google.com:19302 "
422 #define RTP_TWCC_URI "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
423 #define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS"
424 #define RTP_OPUS_DEFAULT_PT 97
425 #define RTP_CAPS_VP8 "application/x-rtp,media=video,encoding-name=VP8"
426 #define RTP_VP8_DEFAULT_PT 96
427
428 static gboolean
429 start_pipeline (gboolean create_offer, guint opus_pt, guint vp8_pt)
430 {
431   char *pipeline;
432   GstStateChangeReturn ret;
433   GError *error = NULL;
434
435   pipeline =
436       g_strdup_printf ("webrtcbin bundle-policy=max-bundle name=sendrecv "
437       STUN_SERVER
438       "videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! "
439       /* increase the default keyframe distance, browsers have really long
440        * periods between keyframes and rely on PLI events on packet loss to
441        * fix corrupted video.
442        */
443       "vp8enc deadline=1 keyframe-max-dist=2000 ! "
444       /* picture-id-mode=15-bit seems to make TWCC stats behave better, and
445        * fixes stuttery video playback in Chrome */
446       "rtpvp8pay name=videopay picture-id-mode=15-bit ! "
447       "queue ! %s,payload=%u ! sendrecv. "
448       "audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay name=audiopay ! "
449       "queue ! %s,payload=%u ! sendrecv. ", RTP_CAPS_VP8, vp8_pt,
450       RTP_CAPS_OPUS, opus_pt);
451
452   pipe1 = gst_parse_launch (pipeline, &error);
453   g_free (pipeline);
454   if (error) {
455     gst_printerr ("Failed to parse launch: %s\n", error->message);
456     g_error_free (error);
457     goto err;
458   }
459
460   webrtc1 = gst_bin_get_by_name (GST_BIN (pipe1), "sendrecv");
461   g_assert_nonnull (webrtc1);
462
463   if (!create_offer) {
464     /* XXX: this will fail when the remote offers twcc as the extension id
465      * cannot currently be negotiated when receiving an offer.
466      */
467     GST_FIXME ("Need to implement header extension negotiation when "
468         "reciving a remote offers");
469   } else {
470     GstElement *videopay, *audiopay;
471     GstRTPHeaderExtension *video_twcc, *audio_twcc;
472
473     videopay = gst_bin_get_by_name (GST_BIN (pipe1), "videopay");
474     g_assert_nonnull (videopay);
475     video_twcc = gst_rtp_header_extension_create_from_uri (RTP_TWCC_URI);
476     g_assert_nonnull (video_twcc);
477     gst_rtp_header_extension_set_id (video_twcc, 1);
478     g_signal_emit_by_name (videopay, "add-extension", video_twcc);
479     g_clear_object (&video_twcc);
480     g_clear_object (&videopay);
481
482     audiopay = gst_bin_get_by_name (GST_BIN (pipe1), "audiopay");
483     g_assert_nonnull (audiopay);
484     audio_twcc = gst_rtp_header_extension_create_from_uri (RTP_TWCC_URI);
485     g_assert_nonnull (audio_twcc);
486     gst_rtp_header_extension_set_id (audio_twcc, 1);
487     g_signal_emit_by_name (audiopay, "add-extension", audio_twcc);
488     g_clear_object (&audio_twcc);
489     g_clear_object (&audiopay);
490   }
491
492   /* This is the gstwebrtc entry point where we create the offer and so on. It
493    * will be called when the pipeline goes to PLAYING. */
494   g_signal_connect (webrtc1, "on-negotiation-needed",
495       G_CALLBACK (on_negotiation_needed), GINT_TO_POINTER (create_offer));
496   /* We need to transmit this ICE candidate to the browser via the websockets
497    * signalling server. Incoming ice candidates from the browser need to be
498    * added by us too, see on_server_message() */
499   g_signal_connect (webrtc1, "on-ice-candidate",
500       G_CALLBACK (send_ice_candidate_message), NULL);
501   g_signal_connect (webrtc1, "notify::ice-gathering-state",
502       G_CALLBACK (on_ice_gathering_state_notify), NULL);
503
504   gst_element_set_state (pipe1, GST_STATE_READY);
505
506   g_signal_emit_by_name (webrtc1, "create-data-channel", "channel", NULL,
507       &send_channel);
508   if (send_channel) {
509     gst_print ("Created data channel\n");
510     connect_data_channel_signals (send_channel);
511   } else {
512     gst_print ("Could not create data channel, is usrsctp available?\n");
513   }
514
515   g_signal_connect (webrtc1, "on-data-channel", G_CALLBACK (on_data_channel),
516       NULL);
517   /* Incoming streams will be exposed via this signal */
518   g_signal_connect (webrtc1, "pad-added", G_CALLBACK (on_incoming_stream),
519       pipe1);
520   /* Lifetime is the same as the pipeline itself */
521   gst_object_unref (webrtc1);
522
523   g_timeout_add (100, (GSourceFunc) webrtcbin_get_stats, webrtc1);
524
525   gst_print ("Starting pipeline\n");
526   ret = gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_PLAYING);
527   if (ret == GST_STATE_CHANGE_FAILURE)
528     goto err;
529
530   return TRUE;
531
532 err:
533   if (pipe1)
534     g_clear_object (&pipe1);
535   if (webrtc1)
536     webrtc1 = NULL;
537   return FALSE;
538 }
539
540 static gboolean
541 setup_call (void)
542 {
543   gchar *msg;
544
545   if (soup_websocket_connection_get_state (ws_conn) !=
546       SOUP_WEBSOCKET_STATE_OPEN)
547     return FALSE;
548
549   if (!peer_id)
550     return FALSE;
551
552   gst_print ("Setting up signalling server call with %s\n", peer_id);
553   app_state = PEER_CONNECTING;
554   msg = g_strdup_printf ("SESSION %s", peer_id);
555   soup_websocket_connection_send_text (ws_conn, msg);
556   g_free (msg);
557   return TRUE;
558 }
559
560 static gboolean
561 register_with_server (void)
562 {
563   gchar *hello;
564
565   if (soup_websocket_connection_get_state (ws_conn) !=
566       SOUP_WEBSOCKET_STATE_OPEN)
567     return FALSE;
568
569   if (!our_id) {
570     gint32 id;
571
572     id = g_random_int_range (10, 10000);
573     gst_print ("Registering id %i with server\n", id);
574
575     hello = g_strdup_printf ("HELLO %i", id);
576   } else {
577     gst_print ("Registering id %s with server\n", our_id);
578
579     hello = g_strdup_printf ("HELLO %s", our_id);
580   }
581
582   app_state = SERVER_REGISTERING;
583
584   /* Register with the server with a random integer id. Reply will be received
585    * by on_server_message() */
586   soup_websocket_connection_send_text (ws_conn, hello);
587   g_free (hello);
588
589   return TRUE;
590 }
591
592 static void
593 on_server_closed (SoupWebsocketConnection * conn G_GNUC_UNUSED,
594     gpointer user_data G_GNUC_UNUSED)
595 {
596   app_state = SERVER_CLOSED;
597   cleanup_and_quit_loop ("Server connection closed", 0);
598 }
599
600 /* Answer created by our pipeline, to be sent to the peer */
601 static void
602 on_answer_created (GstPromise * promise, gpointer user_data)
603 {
604   GstWebRTCSessionDescription *answer = NULL;
605   const GstStructure *reply;
606
607   g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
608
609   g_assert_cmphex (gst_promise_wait (promise), ==, GST_PROMISE_RESULT_REPLIED);
610   reply = gst_promise_get_reply (promise);
611   gst_structure_get (reply, "answer",
612       GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL);
613   gst_promise_unref (promise);
614
615   promise = gst_promise_new ();
616   g_signal_emit_by_name (webrtc1, "set-local-description", answer, promise);
617   gst_promise_interrupt (promise);
618   gst_promise_unref (promise);
619
620   /* Send answer to peer */
621   send_sdp_to_peer (answer);
622   gst_webrtc_session_description_free (answer);
623 }
624
625 static void
626 on_offer_set (GstPromise * promise, gpointer user_data)
627 {
628   gst_promise_unref (promise);
629   promise = gst_promise_new_with_change_func (on_answer_created, NULL, NULL);
630   g_signal_emit_by_name (webrtc1, "create-answer", NULL, promise);
631 }
632
633 static void
634 on_offer_received (GstSDPMessage * sdp)
635 {
636   GstWebRTCSessionDescription *offer = NULL;
637   GstPromise *promise;
638
639   /* If we got an offer and we have no webrtcbin, we need to parse the SDP,
640    * get the payload types, then start the pipeline */
641   if (!webrtc1 && our_id) {
642     guint medias_len, formats_len;
643     guint opus_pt = 0, vp8_pt = 0;
644
645     gst_println ("Parsing offer to find payload types");
646
647     medias_len = gst_sdp_message_medias_len (sdp);
648     for (int i = 0; i < medias_len; i++) {
649       const GstSDPMedia *media = gst_sdp_message_get_media (sdp, i);
650       formats_len = gst_sdp_media_formats_len (media);
651       for (int j = 0; j < formats_len; j++) {
652         guint pt;
653         GstCaps *caps;
654         GstStructure *s;
655         const char *fmt, *encoding_name;
656
657         fmt = gst_sdp_media_get_format (media, j);
658         if (g_strcmp0 (fmt, "webrtc-datachannel") == 0)
659           continue;
660         pt = atoi (fmt);
661         caps = gst_sdp_media_get_caps_from_media (media, pt);
662         s = gst_caps_get_structure (caps, 0);
663         encoding_name = gst_structure_get_string (s, "encoding-name");
664         if (vp8_pt == 0 && g_strcmp0 (encoding_name, "VP8") == 0)
665           vp8_pt = pt;
666         if (opus_pt == 0 && g_strcmp0 (encoding_name, "OPUS") == 0)
667           opus_pt = pt;
668       }
669     }
670
671     g_assert_cmpint (opus_pt, !=, 0);
672     g_assert_cmpint (vp8_pt, !=, 0);
673
674     gst_println ("Starting pipeline with opus pt: %u vp8 pt: %u", opus_pt,
675         vp8_pt);
676
677     if (!start_pipeline (FALSE, opus_pt, vp8_pt)) {
678       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
679           PEER_CALL_ERROR);
680     }
681   }
682
683   offer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_OFFER, sdp);
684   g_assert_nonnull (offer);
685
686   /* Set remote description on our pipeline */
687   {
688     promise = gst_promise_new_with_change_func (on_offer_set, NULL, NULL);
689     g_signal_emit_by_name (webrtc1, "set-remote-description", offer, promise);
690   }
691   gst_webrtc_session_description_free (offer);
692 }
693
694 /* One mega message handler for our asynchronous calling mechanism */
695 static void
696 on_server_message (SoupWebsocketConnection * conn, SoupWebsocketDataType type,
697     GBytes * message, gpointer user_data)
698 {
699   gchar *text;
700
701   switch (type) {
702     case SOUP_WEBSOCKET_DATA_BINARY:
703       gst_printerr ("Received unknown binary message, ignoring\n");
704       return;
705     case SOUP_WEBSOCKET_DATA_TEXT:{
706       gsize size;
707       const gchar *data = g_bytes_get_data (message, &size);
708       /* Convert to NULL-terminated string */
709       text = g_strndup (data, size);
710       break;
711     }
712     default:
713       g_assert_not_reached ();
714   }
715
716   if (g_strcmp0 (text, "HELLO") == 0) {
717     /* Server has accepted our registration, we are ready to send commands */
718     if (app_state != SERVER_REGISTERING) {
719       cleanup_and_quit_loop ("ERROR: Received HELLO when not registering",
720           APP_STATE_ERROR);
721       goto out;
722     }
723     app_state = SERVER_REGISTERED;
724     gst_print ("Registered with server\n");
725     if (!our_id) {
726       /* Ask signalling server to connect us with a specific peer */
727       if (!setup_call ()) {
728         cleanup_and_quit_loop ("ERROR: Failed to setup call", PEER_CALL_ERROR);
729         goto out;
730       }
731     } else {
732       gst_println ("Waiting for connection from peer (our-id: %s)", our_id);
733     }
734   } else if (g_strcmp0 (text, "SESSION_OK") == 0) {
735     /* The call initiated by us has been setup by the server; now we can start
736      * negotiation */
737     if (app_state != PEER_CONNECTING) {
738       cleanup_and_quit_loop ("ERROR: Received SESSION_OK when not calling",
739           PEER_CONNECTION_ERROR);
740       goto out;
741     }
742
743     app_state = PEER_CONNECTED;
744     /* Start negotiation (exchange SDP and ICE candidates) */
745     if (!start_pipeline (TRUE, RTP_OPUS_DEFAULT_PT, RTP_VP8_DEFAULT_PT))
746       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
747           PEER_CALL_ERROR);
748   } else if (g_strcmp0 (text, "OFFER_REQUEST") == 0) {
749     if (app_state != SERVER_REGISTERED) {
750       gst_printerr ("Received OFFER_REQUEST at a strange time, ignoring\n");
751       goto out;
752     }
753     gst_print ("Received OFFER_REQUEST, sending offer\n");
754     /* Peer wants us to start negotiation (exchange SDP and ICE candidates) */
755     if (!start_pipeline (TRUE, RTP_OPUS_DEFAULT_PT, RTP_VP8_DEFAULT_PT))
756       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
757           PEER_CALL_ERROR);
758   } else if (g_str_has_prefix (text, "ERROR")) {
759     /* Handle errors */
760     switch (app_state) {
761       case SERVER_CONNECTING:
762         app_state = SERVER_CONNECTION_ERROR;
763         break;
764       case SERVER_REGISTERING:
765         app_state = SERVER_REGISTRATION_ERROR;
766         break;
767       case PEER_CONNECTING:
768         app_state = PEER_CONNECTION_ERROR;
769         break;
770       case PEER_CONNECTED:
771       case PEER_CALL_NEGOTIATING:
772         app_state = PEER_CALL_ERROR;
773         break;
774       default:
775         app_state = APP_STATE_ERROR;
776     }
777     cleanup_and_quit_loop (text, 0);
778   } else {
779     /* Look for JSON messages containing SDP and ICE candidates */
780     JsonNode *root;
781     JsonObject *object, *child;
782     JsonParser *parser = json_parser_new ();
783     if (!json_parser_load_from_data (parser, text, -1, NULL)) {
784       gst_printerr ("Unknown message '%s', ignoring\n", text);
785       g_object_unref (parser);
786       goto out;
787     }
788
789     root = json_parser_get_root (parser);
790     if (!JSON_NODE_HOLDS_OBJECT (root)) {
791       gst_printerr ("Unknown json message '%s', ignoring\n", text);
792       g_object_unref (parser);
793       goto out;
794     }
795
796     object = json_node_get_object (root);
797     /* Check type of JSON message */
798     if (json_object_has_member (object, "sdp")) {
799       int ret;
800       GstSDPMessage *sdp;
801       const gchar *text, *sdptype;
802       GstWebRTCSessionDescription *answer;
803
804       app_state = PEER_CALL_NEGOTIATING;
805
806       child = json_object_get_object_member (object, "sdp");
807
808       if (!json_object_has_member (child, "type")) {
809         cleanup_and_quit_loop ("ERROR: received SDP without 'type'",
810             PEER_CALL_ERROR);
811         goto out;
812       }
813
814       sdptype = json_object_get_string_member (child, "type");
815       /* In this example, we create the offer and receive one answer by default,
816        * but it's possible to comment out the offer creation and wait for an offer
817        * instead, so we handle either here.
818        *
819        * See tests/examples/webrtcbidirectional.c in gst-plugins-bad for another
820        * example how to handle offers from peers and reply with answers using webrtcbin. */
821       text = json_object_get_string_member (child, "sdp");
822       ret = gst_sdp_message_new (&sdp);
823       g_assert_cmphex (ret, ==, GST_SDP_OK);
824       ret = gst_sdp_message_parse_buffer ((guint8 *) text, strlen (text), sdp);
825       g_assert_cmphex (ret, ==, GST_SDP_OK);
826
827       if (g_str_equal (sdptype, "answer")) {
828         gst_print ("Received answer:\n%s\n", text);
829         answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
830             sdp);
831         g_assert_nonnull (answer);
832
833         /* Set remote description on our pipeline */
834         {
835           GstPromise *promise = gst_promise_new ();
836           g_signal_emit_by_name (webrtc1, "set-remote-description", answer,
837               promise);
838           gst_promise_interrupt (promise);
839           gst_promise_unref (promise);
840         }
841         app_state = PEER_CALL_STARTED;
842       } else {
843         gst_print ("Received offer:\n%s\n", text);
844         on_offer_received (sdp);
845       }
846
847     } else if (json_object_has_member (object, "ice")) {
848       const gchar *candidate;
849       gint sdpmlineindex;
850
851       child = json_object_get_object_member (object, "ice");
852       candidate = json_object_get_string_member (child, "candidate");
853       sdpmlineindex = json_object_get_int_member (child, "sdpMLineIndex");
854
855       /* Add ice candidate sent by remote peer */
856       g_signal_emit_by_name (webrtc1, "add-ice-candidate", sdpmlineindex,
857           candidate);
858     } else {
859       gst_printerr ("Ignoring unknown JSON message:\n%s\n", text);
860     }
861     g_object_unref (parser);
862   }
863
864 out:
865   g_free (text);
866 }
867
868 static void
869 on_server_connected (SoupSession * session, GAsyncResult * res,
870     SoupMessage * msg)
871 {
872   GError *error = NULL;
873
874   ws_conn = soup_session_websocket_connect_finish (session, res, &error);
875   if (error) {
876     cleanup_and_quit_loop (error->message, SERVER_CONNECTION_ERROR);
877     g_error_free (error);
878     return;
879   }
880
881   g_assert_nonnull (ws_conn);
882
883   app_state = SERVER_CONNECTED;
884   gst_print ("Connected to signalling server\n");
885
886   g_signal_connect (ws_conn, "closed", G_CALLBACK (on_server_closed), NULL);
887   g_signal_connect (ws_conn, "message", G_CALLBACK (on_server_message), NULL);
888
889   /* Register with the server so it knows about us and can accept commands */
890   register_with_server ();
891 }
892
893 /*
894  * Connect to the signalling server. This is the entrypoint for everything else.
895  */
896 static void
897 connect_to_websocket_server_async (void)
898 {
899   SoupLogger *logger;
900   SoupMessage *message;
901   SoupSession *session;
902   const char *https_aliases[] = { "wss", NULL };
903
904   session =
905       soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, !disable_ssl,
906       SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
907       //SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
908       SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
909
910   logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, -1);
911   soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
912   g_object_unref (logger);
913
914   message = soup_message_new (SOUP_METHOD_GET, server_url);
915
916   gst_print ("Connecting to server...\n");
917
918   /* Once connected, we will register */
919   soup_session_websocket_connect_async (session, message, NULL, NULL, NULL,
920       (GAsyncReadyCallback) on_server_connected, message);
921   app_state = SERVER_CONNECTING;
922 }
923
924 static gboolean
925 check_plugins (void)
926 {
927   int i;
928   gboolean ret;
929   GstPlugin *plugin;
930   GstRegistry *registry;
931   const gchar *needed[] = { "opus", "vpx", "nice", "webrtc", "dtls", "srtp",
932     "rtpmanager", "videotestsrc", "audiotestsrc", NULL
933   };
934
935   registry = gst_registry_get ();
936   ret = TRUE;
937   for (i = 0; i < g_strv_length ((gchar **) needed); i++) {
938     plugin = gst_registry_find_plugin (registry, needed[i]);
939     if (!plugin) {
940       gst_print ("Required gstreamer plugin '%s' not found\n", needed[i]);
941       ret = FALSE;
942       continue;
943     }
944     gst_object_unref (plugin);
945   }
946   return ret;
947 }
948
949 int
950 main (int argc, char *argv[])
951 {
952   GOptionContext *context;
953   GError *error = NULL;
954   int ret_code = -1;
955
956   context = g_option_context_new ("- gstreamer webrtc sendrecv demo");
957   g_option_context_add_main_entries (context, entries, NULL);
958   g_option_context_add_group (context, gst_init_get_option_group ());
959   if (!g_option_context_parse (context, &argc, &argv, &error)) {
960     gst_printerr ("Error initializing: %s\n", error->message);
961     return -1;
962   }
963
964   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "webrtc-sendrecv", 0,
965       "WebRTC Sending and Receiving example");
966
967   if (!check_plugins ()) {
968     goto out;
969   }
970
971   if (!peer_id && !our_id) {
972     gst_printerr ("--peer-id or --our-id is a required argument\n");
973     goto out;
974   }
975
976   if (peer_id && our_id) {
977     gst_printerr ("specify only --peer-id or --our-id\n");
978     goto out;
979   }
980
981   ret_code = 0;
982
983   /* Disable ssl when running a localhost server, because
984    * it's probably a test server with a self-signed certificate */
985   {
986     GstUri *uri = gst_uri_from_string (server_url);
987     if (g_strcmp0 ("localhost", gst_uri_get_host (uri)) == 0 ||
988         g_strcmp0 ("127.0.0.1", gst_uri_get_host (uri)) == 0)
989       disable_ssl = TRUE;
990     gst_uri_unref (uri);
991   }
992
993   loop = g_main_loop_new (NULL, FALSE);
994
995   connect_to_websocket_server_async ();
996
997   g_main_loop_run (loop);
998
999   if (loop)
1000     g_main_loop_unref (loop);
1001
1002   if (pipe1) {
1003     gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_NULL);
1004     gst_print ("Pipeline stopped\n");
1005     gst_object_unref (pipe1);
1006   }
1007
1008 out:
1009   g_free (peer_id);
1010   g_free (our_id);
1011
1012   return ret_code;
1013 }