webrtc examples: Force regular non-MULTIOPUS
[platform/upstream/gstreamer.git] / subprojects / gst-examples / webrtc / sendrecv / gst / webrtc-sendrecv.c
1 /*
2  * Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream
3  * with a browser JS app.
4  *
5  * Build by running: `make webrtc-sendrecv`, or build the gstreamer monorepo.
6  *
7  * Author: Nirbheek Chauhan <nirbheek@centricular.com>
8  */
9 #include <gst/gst.h>
10 #include <gst/sdp/sdp.h>
11 #include <gst/rtp/rtp.h>
12
13 #include <gst/webrtc/webrtc.h>
14 #include <gst/webrtc/nice/nice.h>
15
16 #include "custom_agent.h"
17
18 /* For signalling */
19 #include <libsoup/soup.h>
20 #include <json-glib/json-glib.h>
21
22 #include <string.h>
23
24 enum AppState
25 {
26   APP_STATE_UNKNOWN = 0,
27   APP_STATE_ERROR = 1,          /* generic error */
28   SERVER_CONNECTING = 1000,
29   SERVER_CONNECTION_ERROR,
30   SERVER_CONNECTED,             /* Ready to register */
31   SERVER_REGISTERING = 2000,
32   SERVER_REGISTRATION_ERROR,
33   SERVER_REGISTERED,            /* Ready to call a peer */
34   SERVER_CLOSED,                /* server connection closed by us or the server */
35   PEER_CONNECTING = 3000,
36   PEER_CONNECTION_ERROR,
37   PEER_CONNECTED,
38   PEER_CALL_NEGOTIATING = 4000,
39   PEER_CALL_STARTED,
40   PEER_CALL_STOPPING,
41   PEER_CALL_STOPPED,
42   PEER_CALL_ERROR,
43 };
44
45 #define GST_CAT_DEFAULT webrtc_sendrecv_debug
46 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
47
48 static GMainLoop *loop;
49 static GstElement *pipe1, *webrtc1, *audio_bin, *video_bin = NULL;
50 static GObject *send_channel, *receive_channel;
51
52 static SoupWebsocketConnection *ws_conn = NULL;
53 static enum AppState app_state = 0;
54 static gchar *peer_id = NULL;
55 static gchar *our_id = NULL;
56 static const gchar *server_url = "wss://webrtc.nirbheek.in:8443";
57 static gboolean disable_ssl = FALSE;
58 static gboolean remote_is_offerer = FALSE;
59 static gboolean custom_ice = FALSE;
60
61 static GOptionEntry entries[] = {
62   {"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id,
63       "String ID of the peer to connect to", "ID"},
64   {"our-id", 0, 0, G_OPTION_ARG_STRING, &our_id,
65       "String ID that the peer can use to connect to us", "ID"},
66   {"server", 0, 0, G_OPTION_ARG_STRING, &server_url,
67       "Signalling server to connect to", "URL"},
68   {"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL},
69   {"remote-offerer", 0, 0, G_OPTION_ARG_NONE, &remote_is_offerer,
70       "Request that the peer generate the offer and we'll answer", NULL},
71   {"custom-ice", 0, 0, G_OPTION_ARG_NONE, &custom_ice,
72       "Use a custom ice agent", NULL},
73   {NULL},
74 };
75
76 static gboolean
77 cleanup_and_quit_loop (const gchar * msg, enum AppState state)
78 {
79   if (msg)
80     gst_printerr ("%s\n", msg);
81   if (state > 0)
82     app_state = state;
83
84   if (ws_conn) {
85     if (soup_websocket_connection_get_state (ws_conn) ==
86         SOUP_WEBSOCKET_STATE_OPEN)
87       /* This will call us again */
88       soup_websocket_connection_close (ws_conn, 1000, "");
89     else
90       g_clear_object (&ws_conn);
91   }
92
93   if (loop) {
94     g_main_loop_quit (loop);
95     g_clear_pointer (&loop, g_main_loop_unref);
96   }
97
98   /* To allow usage as a GSourceFunc */
99   return G_SOURCE_REMOVE;
100 }
101
102 static gchar *
103 get_string_from_json_object (JsonObject * object)
104 {
105   JsonNode *root;
106   JsonGenerator *generator;
107   gchar *text;
108
109   /* Make it the root node */
110   root = json_node_init_object (json_node_alloc (), object);
111   generator = json_generator_new ();
112   json_generator_set_root (generator, root);
113   text = json_generator_to_data (generator, NULL);
114
115   /* Release everything */
116   g_object_unref (generator);
117   json_node_free (root);
118   return text;
119 }
120
121 static void
122 handle_media_stream (GstPad * pad, GstElement * pipe, const char *convert_name,
123     const char *sink_name)
124 {
125   GstPad *qpad;
126   GstElement *q, *conv, *resample, *sink;
127   GstPadLinkReturn ret;
128
129   gst_println ("Trying to handle stream with %s ! %s", convert_name, sink_name);
130
131   q = gst_element_factory_make ("queue", NULL);
132   g_assert_nonnull (q);
133   conv = gst_element_factory_make (convert_name, NULL);
134   g_assert_nonnull (conv);
135   sink = gst_element_factory_make (sink_name, NULL);
136   g_assert_nonnull (sink);
137
138   if (g_strcmp0 (convert_name, "audioconvert") == 0) {
139     /* Might also need to resample, so add it just in case.
140      * Will be a no-op if it's not required. */
141     resample = gst_element_factory_make ("audioresample", NULL);
142     g_assert_nonnull (resample);
143     gst_bin_add_many (GST_BIN (pipe), q, conv, resample, sink, NULL);
144     gst_element_sync_state_with_parent (q);
145     gst_element_sync_state_with_parent (conv);
146     gst_element_sync_state_with_parent (resample);
147     gst_element_sync_state_with_parent (sink);
148     gst_element_link_many (q, conv, resample, sink, NULL);
149   } else {
150     gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL);
151     gst_element_sync_state_with_parent (q);
152     gst_element_sync_state_with_parent (conv);
153     gst_element_sync_state_with_parent (sink);
154     gst_element_link_many (q, conv, sink, NULL);
155   }
156
157   qpad = gst_element_get_static_pad (q, "sink");
158
159   ret = gst_pad_link (pad, qpad);
160   g_assert_cmphex (ret, ==, GST_PAD_LINK_OK);
161 }
162
163 static void
164 on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad,
165     GstElement * pipe)
166 {
167   GstCaps *caps;
168   const gchar *name;
169
170   if (!gst_pad_has_current_caps (pad)) {
171     gst_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n",
172         GST_PAD_NAME (pad));
173     return;
174   }
175
176   caps = gst_pad_get_current_caps (pad);
177   name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
178
179   if (g_str_has_prefix (name, "video")) {
180     handle_media_stream (pad, pipe, "videoconvert", "autovideosink");
181   } else if (g_str_has_prefix (name, "audio")) {
182     handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink");
183   } else {
184     gst_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad));
185   }
186 }
187
188 static void
189 on_incoming_stream (GstElement * webrtc, GstPad * pad, GstElement * pipe)
190 {
191   GstElement *decodebin;
192   GstPad *sinkpad;
193
194   if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
195     return;
196
197   decodebin = gst_element_factory_make ("decodebin", NULL);
198   g_signal_connect (decodebin, "pad-added",
199       G_CALLBACK (on_incoming_decodebin_stream), pipe);
200   gst_bin_add (GST_BIN (pipe), decodebin);
201   gst_element_sync_state_with_parent (decodebin);
202
203   sinkpad = gst_element_get_static_pad (decodebin, "sink");
204   gst_pad_link (pad, sinkpad);
205   gst_object_unref (sinkpad);
206 }
207
208 static void
209 send_ice_candidate_message (GstElement * webrtc G_GNUC_UNUSED, guint mlineindex,
210     gchar * candidate, gpointer user_data G_GNUC_UNUSED)
211 {
212   gchar *text;
213   JsonObject *ice, *msg;
214
215   if (app_state < PEER_CALL_NEGOTIATING) {
216     cleanup_and_quit_loop ("Can't send ICE, not in call", APP_STATE_ERROR);
217     return;
218   }
219
220   ice = json_object_new ();
221   json_object_set_string_member (ice, "candidate", candidate);
222   json_object_set_int_member (ice, "sdpMLineIndex", mlineindex);
223   msg = json_object_new ();
224   json_object_set_object_member (msg, "ice", ice);
225   text = get_string_from_json_object (msg);
226   json_object_unref (msg);
227
228   soup_websocket_connection_send_text (ws_conn, text);
229   g_free (text);
230 }
231
232 static void
233 send_sdp_to_peer (GstWebRTCSessionDescription * desc)
234 {
235   gchar *text;
236   JsonObject *msg, *sdp;
237
238   if (app_state < PEER_CALL_NEGOTIATING) {
239     cleanup_and_quit_loop ("Can't send SDP to peer, not in call",
240         APP_STATE_ERROR);
241     return;
242   }
243
244   text = gst_sdp_message_as_text (desc->sdp);
245   sdp = json_object_new ();
246
247   if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER) {
248     gst_print ("Sending offer:\n%s\n", text);
249     json_object_set_string_member (sdp, "type", "offer");
250   } else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER) {
251     gst_print ("Sending answer:\n%s\n", text);
252     json_object_set_string_member (sdp, "type", "answer");
253   } else {
254     g_assert_not_reached ();
255   }
256
257   json_object_set_string_member (sdp, "sdp", text);
258   g_free (text);
259
260   msg = json_object_new ();
261   json_object_set_object_member (msg, "sdp", sdp);
262   text = get_string_from_json_object (msg);
263   json_object_unref (msg);
264
265   soup_websocket_connection_send_text (ws_conn, text);
266   g_free (text);
267 }
268
269 /* Offer created by our pipeline, to be sent to the peer */
270 static void
271 on_offer_created (GstPromise * promise, gpointer user_data)
272 {
273   GstWebRTCSessionDescription *offer = NULL;
274   const GstStructure *reply;
275
276   g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
277
278   g_assert_cmphex (gst_promise_wait (promise), ==, GST_PROMISE_RESULT_REPLIED);
279   reply = gst_promise_get_reply (promise);
280   gst_structure_get (reply, "offer",
281       GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
282   gst_promise_unref (promise);
283
284   promise = gst_promise_new ();
285   g_signal_emit_by_name (webrtc1, "set-local-description", offer, promise);
286   gst_promise_interrupt (promise);
287   gst_promise_unref (promise);
288
289   /* Send offer to peer */
290   send_sdp_to_peer (offer);
291   gst_webrtc_session_description_free (offer);
292 }
293
294 static void
295 on_negotiation_needed (GstElement * element, gpointer user_data)
296 {
297   gboolean create_offer = GPOINTER_TO_INT (user_data);
298   app_state = PEER_CALL_NEGOTIATING;
299
300   if (remote_is_offerer) {
301     soup_websocket_connection_send_text (ws_conn, "OFFER_REQUEST");
302   } else if (create_offer) {
303     GstPromise *promise =
304         gst_promise_new_with_change_func (on_offer_created, NULL, NULL);
305     g_signal_emit_by_name (webrtc1, "create-offer", NULL, promise);
306   }
307 }
308
309 static void
310 data_channel_on_error (GObject * dc, gpointer user_data)
311 {
312   cleanup_and_quit_loop ("Data channel error", 0);
313 }
314
315 static void
316 data_channel_on_open (GObject * dc, gpointer user_data)
317 {
318   GBytes *bytes = g_bytes_new ("data", strlen ("data"));
319   gst_print ("data channel opened\n");
320   g_signal_emit_by_name (dc, "send-string", "Hi! from GStreamer");
321   g_signal_emit_by_name (dc, "send-data", bytes);
322   g_bytes_unref (bytes);
323 }
324
325 static void
326 data_channel_on_close (GObject * dc, gpointer user_data)
327 {
328   cleanup_and_quit_loop ("Data channel closed", 0);
329 }
330
331 static void
332 data_channel_on_message_string (GObject * dc, gchar * str, gpointer user_data)
333 {
334   gst_print ("Received data channel message: %s\n", str);
335 }
336
337 static void
338 connect_data_channel_signals (GObject * data_channel)
339 {
340   g_signal_connect (data_channel, "on-error",
341       G_CALLBACK (data_channel_on_error), NULL);
342   g_signal_connect (data_channel, "on-open", G_CALLBACK (data_channel_on_open),
343       NULL);
344   g_signal_connect (data_channel, "on-close",
345       G_CALLBACK (data_channel_on_close), NULL);
346   g_signal_connect (data_channel, "on-message-string",
347       G_CALLBACK (data_channel_on_message_string), NULL);
348 }
349
350 static void
351 on_data_channel (GstElement * webrtc, GObject * data_channel,
352     gpointer user_data)
353 {
354   connect_data_channel_signals (data_channel);
355   receive_channel = data_channel;
356 }
357
358 static void
359 on_ice_gathering_state_notify (GstElement * webrtcbin, GParamSpec * pspec,
360     gpointer user_data)
361 {
362   GstWebRTCICEGatheringState ice_gather_state;
363   const gchar *new_state = "unknown";
364
365   g_object_get (webrtcbin, "ice-gathering-state", &ice_gather_state, NULL);
366   switch (ice_gather_state) {
367     case GST_WEBRTC_ICE_GATHERING_STATE_NEW:
368       new_state = "new";
369       break;
370     case GST_WEBRTC_ICE_GATHERING_STATE_GATHERING:
371       new_state = "gathering";
372       break;
373     case GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE:
374       new_state = "complete";
375       break;
376   }
377   gst_print ("ICE gathering state changed to %s\n", new_state);
378 }
379
380 static gboolean webrtcbin_get_stats (GstElement * webrtcbin);
381
382 static gboolean
383 on_webrtcbin_stat (GQuark field_id, const GValue * value, gpointer unused)
384 {
385   if (GST_VALUE_HOLDS_STRUCTURE (value)) {
386     GST_DEBUG ("stat: \'%s\': %" GST_PTR_FORMAT, g_quark_to_string (field_id),
387         gst_value_get_structure (value));
388   } else {
389     GST_FIXME ("unknown field \'%s\' value type: \'%s\'",
390         g_quark_to_string (field_id), g_type_name (G_VALUE_TYPE (value)));
391   }
392
393   return TRUE;
394 }
395
396 static void
397 on_webrtcbin_get_stats (GstPromise * promise, GstElement * webrtcbin)
398 {
399   const GstStructure *stats;
400
401   g_return_if_fail (gst_promise_wait (promise) == GST_PROMISE_RESULT_REPLIED);
402
403   stats = gst_promise_get_reply (promise);
404   gst_structure_foreach (stats, on_webrtcbin_stat, NULL);
405
406   g_timeout_add (100, (GSourceFunc) webrtcbin_get_stats, webrtcbin);
407 }
408
409 static gboolean
410 webrtcbin_get_stats (GstElement * webrtcbin)
411 {
412   GstPromise *promise;
413
414   promise =
415       gst_promise_new_with_change_func (
416       (GstPromiseChangeFunc) on_webrtcbin_get_stats, webrtcbin, NULL);
417
418   GST_TRACE ("emitting get-stats on %" GST_PTR_FORMAT, webrtcbin);
419   g_signal_emit_by_name (webrtcbin, "get-stats", NULL, promise);
420   gst_promise_unref (promise);
421
422   return G_SOURCE_REMOVE;
423 }
424
425 static gboolean
426 bus_watch_cb (GstBus * bus, GstMessage * message, gpointer user_data)
427 {
428   GstPipeline *pipeline = user_data;
429
430   switch (GST_MESSAGE_TYPE (message)) {
431     case GST_MESSAGE_ERROR:
432     {
433       GError *error = NULL;
434       gchar *debug = NULL;
435
436       gst_message_parse_error (message, &error, &debug);
437       cleanup_and_quit_loop ("ERROR: Error on bus", APP_STATE_ERROR);
438       g_warning ("Error on bus: %s (debug: %s)", error->message, debug);
439       g_error_free (error);
440       g_free (debug);
441       break;
442     }
443     case GST_MESSAGE_WARNING:
444     {
445       GError *error = NULL;
446       gchar *debug = NULL;
447
448       gst_message_parse_warning (message, &error, &debug);
449       g_warning ("Warning on bus: %s (debug: %s)", error->message, debug);
450       g_error_free (error);
451       g_free (debug);
452       break;
453     }
454     case GST_MESSAGE_LATENCY:
455       gst_bin_recalculate_latency (GST_BIN (pipeline));
456       break;
457     default:
458       break;
459   }
460
461   return G_SOURCE_CONTINUE;
462 }
463
464 #define STUN_SERVER "stun://stun.l.google.com:19302"
465 #define RTP_TWCC_URI "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
466 #define RTP_OPUS_DEFAULT_PT 97
467 #define RTP_VP8_DEFAULT_PT 96
468
469 static gboolean
470 start_pipeline (gboolean create_offer, guint opus_pt, guint vp8_pt)
471 {
472   GstBus *bus;
473   char *audio_desc, *video_desc;
474   GstStateChangeReturn ret;
475   GstWebRTCICE *custom_agent;
476   GError *audio_error = NULL;
477   GError *video_error = NULL;
478
479   pipe1 = gst_pipeline_new ("webrtc-pipeline");
480
481   audio_desc =
482       g_strdup_printf
483       ("audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample"
484       "! queue ! opusenc ! rtpopuspay name=audiopay pt=%u "
485       "! application/x-rtp, encoding-name=OPUS ! queue", opus_pt);
486   audio_bin = gst_parse_bin_from_description (audio_desc, TRUE, &audio_error);
487   g_free (audio_desc);
488   if (audio_error) {
489     gst_printerr ("Failed to parse audio_bin: %s\n", audio_error->message);
490     g_error_free (audio_error);
491     goto err;
492   }
493
494   video_desc =
495       g_strdup_printf
496       ("videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! "
497       /* increase the default keyframe distance, browsers have really long
498        * periods between keyframes and rely on PLI events on packet loss to
499        * fix corrupted video.
500        */
501       "vp8enc deadline=1 keyframe-max-dist=2000 ! "
502       /* picture-id-mode=15-bit seems to make TWCC stats behave better, and
503        * fixes stuttery video playback in Chrome */
504       "rtpvp8pay name=videopay picture-id-mode=15-bit pt=%u ! queue", vp8_pt);
505   video_bin = gst_parse_bin_from_description (video_desc, TRUE, &video_error);
506   g_free (video_desc);
507   if (video_error) {
508     gst_printerr ("Failed to parse video_bin: %s\n", video_error->message);
509     g_error_free (video_error);
510     goto err;
511   }
512
513   if (custom_ice) {
514     custom_agent = GST_WEBRTC_ICE (customice_agent_new ("custom"));
515     webrtc1 = gst_element_factory_make_full ("webrtcbin", "name", "sendrecv",
516         "stun-server", STUN_SERVER, "ice-agent", custom_agent, NULL);
517   } else {
518     webrtc1 = gst_element_factory_make_full ("webrtcbin", "name", "sendrecv",
519         "stun-server", STUN_SERVER, NULL);
520   }
521   g_assert_nonnull (webrtc1);
522   gst_util_set_object_arg (G_OBJECT (webrtc1), "bundle-policy", "max-bundle");
523
524   /* Takes ownership of each: */
525   gst_bin_add_many (GST_BIN (pipe1), audio_bin, video_bin, webrtc1, NULL);
526
527   if (!gst_element_link (audio_bin, webrtc1)) {
528     gst_printerr ("Failed to link audio_bin \n");
529   }
530   if (!gst_element_link (video_bin, webrtc1)) {
531     gst_printerr ("Failed to link video_bin \n");
532   }
533
534   if (!create_offer) {
535     /* XXX: this will fail when the remote offers twcc as the extension id
536      * cannot currently be negotiated when receiving an offer.
537      */
538     GST_FIXME ("Need to implement header extension negotiation when "
539         "reciving a remote offers");
540   } else {
541     GstElement *videopay, *audiopay;
542     GstRTPHeaderExtension *video_twcc, *audio_twcc;
543
544     videopay = gst_bin_get_by_name (GST_BIN (pipe1), "videopay");
545     g_assert_nonnull (videopay);
546     video_twcc = gst_rtp_header_extension_create_from_uri (RTP_TWCC_URI);
547     g_assert_nonnull (video_twcc);
548     gst_rtp_header_extension_set_id (video_twcc, 1);
549     g_signal_emit_by_name (videopay, "add-extension", video_twcc);
550     g_clear_object (&video_twcc);
551     g_clear_object (&videopay);
552
553     audiopay = gst_bin_get_by_name (GST_BIN (pipe1), "audiopay");
554     g_assert_nonnull (audiopay);
555     audio_twcc = gst_rtp_header_extension_create_from_uri (RTP_TWCC_URI);
556     g_assert_nonnull (audio_twcc);
557     gst_rtp_header_extension_set_id (audio_twcc, 1);
558     g_signal_emit_by_name (audiopay, "add-extension", audio_twcc);
559     g_clear_object (&audio_twcc);
560     g_clear_object (&audiopay);
561   }
562
563   /* This is the gstwebrtc entry point where we create the offer and so on. It
564    * will be called when the pipeline goes to PLAYING. */
565   g_signal_connect (webrtc1, "on-negotiation-needed",
566       G_CALLBACK (on_negotiation_needed), GINT_TO_POINTER (create_offer));
567   /* We need to transmit this ICE candidate to the browser via the websockets
568    * signalling server. Incoming ice candidates from the browser need to be
569    * added by us too, see on_server_message() */
570   g_signal_connect (webrtc1, "on-ice-candidate",
571       G_CALLBACK (send_ice_candidate_message), NULL);
572   g_signal_connect (webrtc1, "notify::ice-gathering-state",
573       G_CALLBACK (on_ice_gathering_state_notify), NULL);
574
575   bus = gst_pipeline_get_bus (GST_PIPELINE (pipe1));
576   gst_bus_add_watch (bus, bus_watch_cb, pipe1);
577   gst_object_unref (bus);
578
579   gst_element_set_state (pipe1, GST_STATE_READY);
580
581   g_signal_emit_by_name (webrtc1, "create-data-channel", "channel", NULL,
582       &send_channel);
583   if (send_channel) {
584     gst_print ("Created data channel\n");
585     connect_data_channel_signals (send_channel);
586   } else {
587     gst_print ("Could not create data channel, is usrsctp available?\n");
588   }
589
590   g_signal_connect (webrtc1, "on-data-channel", G_CALLBACK (on_data_channel),
591       NULL);
592   /* Incoming streams will be exposed via this signal */
593   g_signal_connect (webrtc1, "pad-added", G_CALLBACK (on_incoming_stream),
594       pipe1);
595
596   g_timeout_add (100, (GSourceFunc) webrtcbin_get_stats, webrtc1);
597
598   gst_print ("Starting pipeline\n");
599   ret = gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_PLAYING);
600   if (ret == GST_STATE_CHANGE_FAILURE)
601     goto err;
602
603   return TRUE;
604
605 err:
606   if (pipe1)
607     g_clear_object (&pipe1);
608   if (webrtc1)
609     webrtc1 = NULL;
610   return FALSE;
611 }
612
613 static gboolean
614 setup_call (void)
615 {
616   gchar *msg;
617
618   if (soup_websocket_connection_get_state (ws_conn) !=
619       SOUP_WEBSOCKET_STATE_OPEN)
620     return FALSE;
621
622   if (!peer_id)
623     return FALSE;
624
625   gst_print ("Setting up signalling server call with %s\n", peer_id);
626   app_state = PEER_CONNECTING;
627   msg = g_strdup_printf ("SESSION %s", peer_id);
628   soup_websocket_connection_send_text (ws_conn, msg);
629   g_free (msg);
630   return TRUE;
631 }
632
633 static gboolean
634 register_with_server (void)
635 {
636   gchar *hello;
637
638   if (soup_websocket_connection_get_state (ws_conn) !=
639       SOUP_WEBSOCKET_STATE_OPEN)
640     return FALSE;
641
642   if (!our_id) {
643     gint32 id;
644
645     id = g_random_int_range (10, 10000);
646     gst_print ("Registering id %i with server\n", id);
647
648     hello = g_strdup_printf ("HELLO %i", id);
649   } else {
650     gst_print ("Registering id %s with server\n", our_id);
651
652     hello = g_strdup_printf ("HELLO %s", our_id);
653   }
654
655   app_state = SERVER_REGISTERING;
656
657   /* Register with the server with a random integer id. Reply will be received
658    * by on_server_message() */
659   soup_websocket_connection_send_text (ws_conn, hello);
660   g_free (hello);
661
662   return TRUE;
663 }
664
665 static void
666 on_server_closed (SoupWebsocketConnection * conn G_GNUC_UNUSED,
667     gpointer user_data G_GNUC_UNUSED)
668 {
669   app_state = SERVER_CLOSED;
670   cleanup_and_quit_loop ("Server connection closed", 0);
671 }
672
673 /* Answer created by our pipeline, to be sent to the peer */
674 static void
675 on_answer_created (GstPromise * promise, gpointer user_data)
676 {
677   GstWebRTCSessionDescription *answer = NULL;
678   const GstStructure *reply;
679
680   g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
681
682   g_assert_cmphex (gst_promise_wait (promise), ==, GST_PROMISE_RESULT_REPLIED);
683   reply = gst_promise_get_reply (promise);
684   gst_structure_get (reply, "answer",
685       GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL);
686   gst_promise_unref (promise);
687
688   promise = gst_promise_new ();
689   g_signal_emit_by_name (webrtc1, "set-local-description", answer, promise);
690   gst_promise_interrupt (promise);
691   gst_promise_unref (promise);
692
693   /* Send answer to peer */
694   send_sdp_to_peer (answer);
695   gst_webrtc_session_description_free (answer);
696 }
697
698 static void
699 on_offer_set (GstPromise * promise, gpointer user_data)
700 {
701   gst_promise_unref (promise);
702   promise = gst_promise_new_with_change_func (on_answer_created, NULL, NULL);
703   g_signal_emit_by_name (webrtc1, "create-answer", NULL, promise);
704 }
705
706 static void
707 on_offer_received (GstSDPMessage * sdp)
708 {
709   GstWebRTCSessionDescription *offer = NULL;
710   GstPromise *promise;
711
712   /* If we got an offer and we have no webrtcbin, we need to parse the SDP,
713    * get the payload types, then start the pipeline */
714   if (!webrtc1 && our_id) {
715     guint medias_len, formats_len;
716     guint opus_pt = 0, vp8_pt = 0;
717
718     gst_println ("Parsing offer to find payload types");
719
720     medias_len = gst_sdp_message_medias_len (sdp);
721     for (int i = 0; i < medias_len; i++) {
722       const GstSDPMedia *media = gst_sdp_message_get_media (sdp, i);
723       formats_len = gst_sdp_media_formats_len (media);
724       for (int j = 0; j < formats_len; j++) {
725         guint pt;
726         GstCaps *caps;
727         GstStructure *s;
728         const char *fmt, *encoding_name;
729
730         fmt = gst_sdp_media_get_format (media, j);
731         if (g_strcmp0 (fmt, "webrtc-datachannel") == 0)
732           continue;
733         pt = atoi (fmt);
734         caps = gst_sdp_media_get_caps_from_media (media, pt);
735         s = gst_caps_get_structure (caps, 0);
736         encoding_name = gst_structure_get_string (s, "encoding-name");
737         if (vp8_pt == 0 && g_strcmp0 (encoding_name, "VP8") == 0)
738           vp8_pt = pt;
739         if (opus_pt == 0 && g_strcmp0 (encoding_name, "OPUS") == 0)
740           opus_pt = pt;
741       }
742     }
743
744     g_assert_cmpint (opus_pt, !=, 0);
745     g_assert_cmpint (vp8_pt, !=, 0);
746
747     gst_println ("Starting pipeline with opus pt: %u vp8 pt: %u", opus_pt,
748         vp8_pt);
749
750     if (!start_pipeline (FALSE, opus_pt, vp8_pt)) {
751       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
752           PEER_CALL_ERROR);
753     }
754   }
755
756   offer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_OFFER, sdp);
757   g_assert_nonnull (offer);
758
759   /* Set remote description on our pipeline */
760   {
761     promise = gst_promise_new_with_change_func (on_offer_set, NULL, NULL);
762     g_signal_emit_by_name (webrtc1, "set-remote-description", offer, promise);
763   }
764   gst_webrtc_session_description_free (offer);
765 }
766
767 /* One mega message handler for our asynchronous calling mechanism */
768 static void
769 on_server_message (SoupWebsocketConnection * conn, SoupWebsocketDataType type,
770     GBytes * message, gpointer user_data)
771 {
772   gchar *text;
773
774   switch (type) {
775     case SOUP_WEBSOCKET_DATA_BINARY:
776       gst_printerr ("Received unknown binary message, ignoring\n");
777       return;
778     case SOUP_WEBSOCKET_DATA_TEXT:{
779       gsize size;
780       const gchar *data = g_bytes_get_data (message, &size);
781       /* Convert to NULL-terminated string */
782       text = g_strndup (data, size);
783       break;
784     }
785     default:
786       g_assert_not_reached ();
787   }
788
789   if (g_strcmp0 (text, "HELLO") == 0) {
790     /* Server has accepted our registration, we are ready to send commands */
791     if (app_state != SERVER_REGISTERING) {
792       cleanup_and_quit_loop ("ERROR: Received HELLO when not registering",
793           APP_STATE_ERROR);
794       goto out;
795     }
796     app_state = SERVER_REGISTERED;
797     gst_print ("Registered with server\n");
798     if (!our_id) {
799       /* Ask signalling server to connect us with a specific peer */
800       if (!setup_call ()) {
801         cleanup_and_quit_loop ("ERROR: Failed to setup call", PEER_CALL_ERROR);
802         goto out;
803       }
804     } else {
805       gst_println ("Waiting for connection from peer (our-id: %s)", our_id);
806     }
807   } else if (g_strcmp0 (text, "SESSION_OK") == 0) {
808     /* The call initiated by us has been setup by the server; now we can start
809      * negotiation */
810     if (app_state != PEER_CONNECTING) {
811       cleanup_and_quit_loop ("ERROR: Received SESSION_OK when not calling",
812           PEER_CONNECTION_ERROR);
813       goto out;
814     }
815
816     app_state = PEER_CONNECTED;
817     /* Start negotiation (exchange SDP and ICE candidates) */
818     if (!start_pipeline (TRUE, RTP_OPUS_DEFAULT_PT, RTP_VP8_DEFAULT_PT))
819       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
820           PEER_CALL_ERROR);
821   } else if (g_strcmp0 (text, "OFFER_REQUEST") == 0) {
822     if (app_state != SERVER_REGISTERED) {
823       gst_printerr ("Received OFFER_REQUEST at a strange time, ignoring\n");
824       goto out;
825     }
826     gst_print ("Received OFFER_REQUEST, sending offer\n");
827     /* Peer wants us to start negotiation (exchange SDP and ICE candidates) */
828     if (!start_pipeline (TRUE, RTP_OPUS_DEFAULT_PT, RTP_VP8_DEFAULT_PT))
829       cleanup_and_quit_loop ("ERROR: failed to start pipeline",
830           PEER_CALL_ERROR);
831   } else if (g_str_has_prefix (text, "ERROR")) {
832     /* Handle errors */
833     switch (app_state) {
834       case SERVER_CONNECTING:
835         app_state = SERVER_CONNECTION_ERROR;
836         break;
837       case SERVER_REGISTERING:
838         app_state = SERVER_REGISTRATION_ERROR;
839         break;
840       case PEER_CONNECTING:
841         app_state = PEER_CONNECTION_ERROR;
842         break;
843       case PEER_CONNECTED:
844       case PEER_CALL_NEGOTIATING:
845         app_state = PEER_CALL_ERROR;
846         break;
847       default:
848         app_state = APP_STATE_ERROR;
849     }
850     cleanup_and_quit_loop (text, 0);
851   } else {
852     /* Look for JSON messages containing SDP and ICE candidates */
853     JsonNode *root;
854     JsonObject *object, *child;
855     JsonParser *parser = json_parser_new ();
856     if (!json_parser_load_from_data (parser, text, -1, NULL)) {
857       gst_printerr ("Unknown message '%s', ignoring\n", text);
858       g_object_unref (parser);
859       goto out;
860     }
861
862     root = json_parser_get_root (parser);
863     if (!JSON_NODE_HOLDS_OBJECT (root)) {
864       gst_printerr ("Unknown json message '%s', ignoring\n", text);
865       g_object_unref (parser);
866       goto out;
867     }
868
869     object = json_node_get_object (root);
870     /* Check type of JSON message */
871     if (json_object_has_member (object, "sdp")) {
872       int ret;
873       GstSDPMessage *sdp;
874       const gchar *text, *sdptype;
875       GstWebRTCSessionDescription *answer;
876
877       app_state = PEER_CALL_NEGOTIATING;
878
879       child = json_object_get_object_member (object, "sdp");
880
881       if (!json_object_has_member (child, "type")) {
882         cleanup_and_quit_loop ("ERROR: received SDP without 'type'",
883             PEER_CALL_ERROR);
884         goto out;
885       }
886
887       sdptype = json_object_get_string_member (child, "type");
888       /* In this example, we create the offer and receive one answer by default,
889        * but it's possible to comment out the offer creation and wait for an offer
890        * instead, so we handle either here.
891        *
892        * See tests/examples/webrtcbidirectional.c in gst-plugins-bad for another
893        * example how to handle offers from peers and reply with answers using webrtcbin. */
894       text = json_object_get_string_member (child, "sdp");
895       ret = gst_sdp_message_new (&sdp);
896       g_assert_cmphex (ret, ==, GST_SDP_OK);
897       ret = gst_sdp_message_parse_buffer ((guint8 *) text, strlen (text), sdp);
898       g_assert_cmphex (ret, ==, GST_SDP_OK);
899
900       if (g_str_equal (sdptype, "answer")) {
901         gst_print ("Received answer:\n%s\n", text);
902         answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
903             sdp);
904         g_assert_nonnull (answer);
905
906         /* Set remote description on our pipeline */
907         {
908           GstPromise *promise = gst_promise_new ();
909           g_signal_emit_by_name (webrtc1, "set-remote-description", answer,
910               promise);
911           gst_promise_interrupt (promise);
912           gst_promise_unref (promise);
913         }
914         app_state = PEER_CALL_STARTED;
915       } else {
916         gst_print ("Received offer:\n%s\n", text);
917         on_offer_received (sdp);
918       }
919
920     } else if (json_object_has_member (object, "ice")) {
921       const gchar *candidate;
922       gint sdpmlineindex;
923
924       child = json_object_get_object_member (object, "ice");
925       candidate = json_object_get_string_member (child, "candidate");
926       sdpmlineindex = json_object_get_int_member (child, "sdpMLineIndex");
927
928       /* Add ice candidate sent by remote peer */
929       g_signal_emit_by_name (webrtc1, "add-ice-candidate", sdpmlineindex,
930           candidate);
931     } else {
932       gst_printerr ("Ignoring unknown JSON message:\n%s\n", text);
933     }
934     g_object_unref (parser);
935   }
936
937 out:
938   g_free (text);
939 }
940
941 static void
942 on_server_connected (SoupSession * session, GAsyncResult * res,
943     SoupMessage * msg)
944 {
945   GError *error = NULL;
946
947   ws_conn = soup_session_websocket_connect_finish (session, res, &error);
948   if (error) {
949     cleanup_and_quit_loop (error->message, SERVER_CONNECTION_ERROR);
950     g_error_free (error);
951     return;
952   }
953
954   g_assert_nonnull (ws_conn);
955
956   app_state = SERVER_CONNECTED;
957   gst_print ("Connected to signalling server\n");
958
959   g_signal_connect (ws_conn, "closed", G_CALLBACK (on_server_closed), NULL);
960   g_signal_connect (ws_conn, "message", G_CALLBACK (on_server_message), NULL);
961
962   /* Register with the server so it knows about us and can accept commands */
963   register_with_server ();
964 }
965
966 /*
967  * Connect to the signalling server. This is the entrypoint for everything else.
968  */
969 static void
970 connect_to_websocket_server_async (void)
971 {
972   SoupLogger *logger;
973   SoupMessage *message;
974   SoupSession *session;
975   const char *https_aliases[] = { "wss", NULL };
976
977   session =
978       soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, !disable_ssl,
979       SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
980       //SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
981       SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
982
983   logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, -1);
984   soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
985   g_object_unref (logger);
986
987   message = soup_message_new (SOUP_METHOD_GET, server_url);
988
989   gst_print ("Connecting to server...\n");
990
991   /* Once connected, we will register */
992   soup_session_websocket_connect_async (session, message, NULL, NULL, NULL,
993       (GAsyncReadyCallback) on_server_connected, message);
994   app_state = SERVER_CONNECTING;
995 }
996
997 static gboolean
998 check_plugins (void)
999 {
1000   int i;
1001   gboolean ret;
1002   GstPlugin *plugin;
1003   GstRegistry *registry;
1004   const gchar *needed[] = { "opus", "vpx", "nice", "webrtc", "dtls", "srtp",
1005     "rtpmanager", "videotestsrc", "audiotestsrc", NULL
1006   };
1007
1008   registry = gst_registry_get ();
1009   ret = TRUE;
1010   for (i = 0; i < g_strv_length ((gchar **) needed); i++) {
1011     plugin = gst_registry_find_plugin (registry, needed[i]);
1012     if (!plugin) {
1013       gst_print ("Required gstreamer plugin '%s' not found\n", needed[i]);
1014       ret = FALSE;
1015       continue;
1016     }
1017     gst_object_unref (plugin);
1018   }
1019   return ret;
1020 }
1021
1022 int
1023 main (int argc, char *argv[])
1024 {
1025   GOptionContext *context;
1026   GError *error = NULL;
1027   int ret_code = -1;
1028
1029   context = g_option_context_new ("- gstreamer webrtc sendrecv demo");
1030   g_option_context_add_main_entries (context, entries, NULL);
1031   g_option_context_add_group (context, gst_init_get_option_group ());
1032   if (!g_option_context_parse (context, &argc, &argv, &error)) {
1033     gst_printerr ("Error initializing: %s\n", error->message);
1034     return -1;
1035   }
1036
1037   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "webrtc-sendrecv", 0,
1038       "WebRTC Sending and Receiving example");
1039
1040   if (!check_plugins ()) {
1041     goto out;
1042   }
1043
1044   if (!peer_id && !our_id) {
1045     gst_printerr ("--peer-id or --our-id is a required argument\n");
1046     goto out;
1047   }
1048
1049   if (peer_id && our_id) {
1050     gst_printerr ("specify only --peer-id or --our-id\n");
1051     goto out;
1052   }
1053
1054   ret_code = 0;
1055
1056   /* Disable ssl when running a localhost server, because
1057    * it's probably a test server with a self-signed certificate */
1058   {
1059     GstUri *uri = gst_uri_from_string (server_url);
1060     if (g_strcmp0 ("localhost", gst_uri_get_host (uri)) == 0 ||
1061         g_strcmp0 ("127.0.0.1", gst_uri_get_host (uri)) == 0)
1062       disable_ssl = TRUE;
1063     gst_uri_unref (uri);
1064   }
1065
1066   loop = g_main_loop_new (NULL, FALSE);
1067
1068   connect_to_websocket_server_async ();
1069
1070   g_main_loop_run (loop);
1071
1072   if (loop)
1073     g_main_loop_unref (loop);
1074
1075   if (pipe1) {
1076     GstBus *bus;
1077
1078     gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_NULL);
1079     gst_print ("Pipeline stopped\n");
1080
1081     bus = gst_pipeline_get_bus (GST_PIPELINE (pipe1));
1082     gst_bus_remove_watch (bus);
1083     gst_object_unref (bus);
1084
1085     gst_object_unref (pipe1);
1086   }
1087
1088 out:
1089   g_free (peer_id);
1090   g_free (our_id);
1091
1092   return ret_code;
1093 }