datachannel: Notify low buffered amount according to spec
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / webrtc / webrtcdatachannel.c
index e3877f8..89fbd7c 100644 (file)
@@ -50,6 +50,9 @@ G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
     GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
         "webrtcdatachannel"););
 
+G_LOCK_DEFINE_STATIC (outstanding_channels_lock);
+static GList *outstanding_channels;
+
 typedef enum
 {
   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
@@ -413,8 +416,8 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
     GST_INFO_OBJECT (channel, "Received channel open");
 
     if (channel->parent.negotiated) {
-      g_set_error (error, GST_WEBRTC_BIN_ERROR,
-          GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+      g_set_error (error, GST_WEBRTC_ERROR,
+          GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
           "Data channel was signalled as negotiated already");
       g_return_val_if_reached (GST_FLOW_ERROR);
     }
@@ -479,16 +482,15 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
 
     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
     if (ret != GST_FLOW_OK) {
-      g_set_error (error, GST_WEBRTC_BIN_ERROR,
-          GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
-          "Could not send ack packet");
+      g_set_error (error, GST_WEBRTC_ERROR,
+          GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
       return ret;
     }
 
     return ret;
   } else {
-    g_set_error (error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+    g_set_error (error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
         "Unknown message type in control protocol");
     return GST_FLOW_ERROR;
   }
@@ -497,8 +499,8 @@ parse_error:
   {
     g_free (label);
     g_free (proto);
-    g_set_error (error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
+    g_set_error (error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
     g_return_val_if_reached (GST_FLOW_ERROR);
   }
 }
@@ -550,14 +552,14 @@ _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
 
   buffer = gst_sample_get_buffer (sample);
   if (!buffer) {
-    g_set_error (error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
+    g_set_error (error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
     return GST_FLOW_ERROR;
   }
   receive = gst_sctp_buffer_get_receive_meta (buffer);
   if (!receive) {
-    g_set_error (error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+    g_set_error (error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
         "No SCTP Receive meta on the buffer");
     return GST_FLOW_ERROR;
   }
@@ -566,8 +568,8 @@ _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
       GstMapInfo info = GST_MAP_INFO_INIT;
       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
-        g_set_error (error, GST_WEBRTC_BIN_ERROR,
-            GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+        g_set_error (error, GST_WEBRTC_ERROR,
+            GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
             "Failed to map received buffer");
         ret = GST_FLOW_ERROR;
       } else {
@@ -580,8 +582,8 @@ _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
       GstMapInfo info = GST_MAP_INFO_INIT;
       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
-        g_set_error (error, GST_WEBRTC_BIN_ERROR,
-            GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+        g_set_error (error, GST_WEBRTC_ERROR,
+            GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
             "Failed to map received buffer");
         ret = GST_FLOW_ERROR;
       } else {
@@ -596,8 +598,8 @@ _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
       struct map_info *info = g_new0 (struct map_info, 1);
       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
-        g_set_error (error, GST_WEBRTC_BIN_ERROR,
-            GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+        g_set_error (error, GST_WEBRTC_ERROR,
+            GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
             "Failed to map received buffer");
         ret = GST_FLOW_ERROR;
       } else {
@@ -618,8 +620,8 @@ _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
           NULL);
       break;
     default:
-      g_set_error (error, GST_WEBRTC_BIN_ERROR,
-          GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+      g_set_error (error, GST_WEBRTC_ERROR,
+          GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
           "Unknown SCTP PPID %u received", receive->ppid);
       ret = GST_FLOW_ERROR;
       break;
@@ -705,6 +707,7 @@ webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+  g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
 
   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
           buffer) == GST_FLOW_OK) {
@@ -712,8 +715,8 @@ webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
   } else {
     GError *error = NULL;
-    g_set_error (&error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+    g_set_error (&error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
         "Failed to send DCEP open packet");
     _channel_store_error (channel, error);
     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
@@ -764,8 +767,8 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
     g_return_if_fail (data != NULL);
     if (!_is_within_max_message_size (channel, size)) {
       GError *error = NULL;
-      g_set_error (&error, GST_WEBRTC_BIN_ERROR,
-          GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+      g_set_error (&error, GST_WEBRTC_ERROR,
+          GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
           "Requested to send data that is too large");
       _channel_store_error (channel, error);
       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
@@ -788,13 +791,14 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+  g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
 
   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
 
   if (ret != GST_FLOW_OK) {
     GError *error = NULL;
-    g_set_error (&error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+    g_set_error (&error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
     _channel_store_error (channel, error);
     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
   }
@@ -820,12 +824,12 @@ webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
   } else {
     gsize size = strlen (str);
-    gchar *str_copy = g_strdup (str);
+    gchar *str_copy;
 
     if (!_is_within_max_message_size (channel, size)) {
       GError *error = NULL;
-      g_set_error (&error, GST_WEBRTC_BIN_ERROR,
-          GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+      g_set_error (&error, GST_WEBRTC_ERROR,
+          GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
           "Requested to send a string that is too large");
       _channel_store_error (channel, error);
       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
@@ -833,6 +837,7 @@ webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
       return;
     }
 
+    str_copy = g_strdup (str);
     buffer =
         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
         size, 0, size, str_copy, g_free);
@@ -849,13 +854,14 @@ webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+  g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
 
   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
 
   if (ret != GST_FLOW_OK) {
     GError *error = NULL;
-    g_set_error (&error, GST_WEBRTC_BIN_ERROR,
-        GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
+    g_set_error (&error, GST_WEBRTC_ERROR,
+        GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
     _channel_store_error (channel, error);
     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
   }
@@ -874,13 +880,37 @@ _on_sctp_notify_state_unlocked (GObject * sctp_transport,
   }
 }
 
+static WebRTCDataChannel *
+ensure_channel_alive (WebRTCDataChannel * channel)
+{
+  /* ghetto impl of, does the channel still exist?.
+   * Needed because g_signal_handler_disconnect*() will not disconnect any
+   * running functions and _finalize() implementation can complete and
+   * invalidate channel */
+  G_LOCK (outstanding_channels_lock);
+  if (g_list_find (outstanding_channels, channel)) {
+    g_object_ref (channel);
+  } else {
+    G_UNLOCK (outstanding_channels_lock);
+    return NULL;
+  }
+  G_UNLOCK (outstanding_channels_lock);
+
+  return channel;
+}
+
 static void
 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
     WebRTCDataChannel * channel)
 {
+  if (!(channel = ensure_channel_alive (channel)))
+    return;
+
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   _on_sctp_notify_state_unlocked (sctp_transport, channel);
   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+
+  g_object_unref (channel);
 }
 
 static void
@@ -915,7 +945,7 @@ on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
         channel->parent.buffered_amount_low_threshold,
         channel->parent.buffered_amount);
     if (prev_amount >= channel->parent.buffered_amount_low_threshold
-        && channel->parent.buffered_amount <
+        && channel->parent.buffered_amount <=
         channel->parent.buffered_amount_low_threshold) {
       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
           NULL);
@@ -927,6 +957,7 @@ on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
           NULL);
     }
     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+    g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
   }
 
   return GST_PAD_PROBE_OK;
@@ -960,6 +991,16 @@ gst_webrtc_data_channel_constructed (GObject * object)
 }
 
 static void
+gst_webrtc_data_channel_dispose (GObject * object)
+{
+  G_LOCK (outstanding_channels_lock);
+  outstanding_channels = g_list_remove (outstanding_channels, object);
+  G_UNLOCK (outstanding_channels_lock);
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
 gst_webrtc_data_channel_finalize (GObject * object)
 {
   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
@@ -989,6 +1030,7 @@ webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
       (GstWebRTCDataChannelClass *) klass;
 
   gobject_class->constructed = gst_webrtc_data_channel_constructed;
+  gobject_class->dispose = gst_webrtc_data_channel_dispose;
   gobject_class->finalize = gst_webrtc_data_channel_finalize;
 
   channel_class->send_data = webrtc_data_channel_send_data;
@@ -999,6 +1041,9 @@ webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
 static void
 webrtc_data_channel_init (WebRTCDataChannel * channel)
 {
+  G_LOCK (outstanding_channels_lock);
+  outstanding_channels = g_list_prepend (outstanding_channels, channel);
+  G_UNLOCK (outstanding_channels_lock);
 }
 
 static void
@@ -1011,6 +1056,7 @@ _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   if (channel->sctp_transport)
     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
+  GST_TRACE ("%p set sctp %p", channel, sctp);
 
   gst_object_replace ((GstObject **) & channel->sctp_transport,
       GST_OBJECT (sctp));