urisourcebin: Fix buffering message aggregation.
authorJan Schmidt <jan@centricular.com>
Fri, 17 Mar 2017 13:54:55 +0000 (00:54 +1100)
committerJan Schmidt <jan@centricular.com>
Fri, 17 Mar 2017 15:03:47 +0000 (02:03 +1100)
Add locking, and handle EOS properly now that urisourcebin
uses custom events in place of real EOS events, so we
need to manually remove buffering messages and potentially
post 100% in that situation

gst/playback/gsturisourcebin.c

index 54ec197..4f7cc5c 100644 (file)
@@ -73,6 +73,23 @@ typedef struct _OutputSlotInfo OutputSlotInfo;
 #define GST_URI_SOURCE_BIN_LOCK(dec) (g_mutex_lock(&((GstURISourceBin*)(dec))->lock))
 #define GST_URI_SOURCE_BIN_UNLOCK(dec) (g_mutex_unlock(&((GstURISourceBin*)(dec))->lock))
 
+#define BUFFERING_LOCK(ubin) G_STMT_START {                            \
+    GST_LOG_OBJECT (ubin,                                              \
+                   "buffering locking from thread %p",                 \
+                   g_thread_self ());                                  \
+    g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);             \
+    GST_LOG_OBJECT (ubin,                                              \
+                   "buffering lock from thread %p",                    \
+                   g_thread_self ());                                  \
+} G_STMT_END
+
+#define BUFFERING_UNLOCK(ubin) G_STMT_START {                          \
+    GST_LOG_OBJECT (ubin,                                              \
+                   "buffering unlocking from thread %p",               \
+                   g_thread_self ());                                  \
+    g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);           \
+} G_STMT_END
+
 /* Track a source pad from a child that
  * is linked or needs linking to an output
  * slot */
@@ -144,6 +161,8 @@ struct _GstURISourceBin
 
   GList *buffering_status;      /* element currently buffering messages */
   gint last_buffering_pct;      /* Avoid sending buffering over and over */
+  GMutex buffering_lock;
+  GMutex buffering_post_lock;
 };
 
 struct _GstURISourceBinClass
@@ -725,6 +744,9 @@ gst_uri_source_bin_init (GstURISourceBin * urisrc)
 
   g_mutex_init (&urisrc->lock);
 
+  g_mutex_init (&urisrc->buffering_lock);
+  g_mutex_init (&urisrc->buffering_post_lock);
+
   urisrc->uri = g_strdup (DEFAULT_PROP_URI);
   urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
 
@@ -1002,6 +1024,9 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
   /* Look for a suitable pending pad */
   cur_caps = gst_pad_get_current_caps (slot->sinkpad);
 
+  GST_DEBUG_OBJECT (urisrc,
+      "Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
+
   for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
     GstPad *pending = (GstPad *) (cur->data);
     ChildSrcPadInfo *cur_info = NULL;
@@ -1029,8 +1054,8 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
     guint block_id =
         gst_pad_add_probe (slot->sinkpad, GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
         NULL, NULL, NULL);
-    GST_DEBUG_OBJECT (urisrc, "Linking pending pad to existing output slot %p",
-        slot);
+    GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
+        " to existing output slot %p", out_info->demux_src_pad, slot);
 
     if (in_info) {
       gst_pad_unlink (in_info->demux_src_pad, slot->sinkpad);
@@ -1042,6 +1067,11 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
             slot->sinkpad) == GST_PAD_LINK_OK) {
       out_info->output_slot = slot;
       slot->linked_info = out_info;
+
+      BUFFERING_LOCK (urisrc);
+      /* A re-linked slot is no longer EOS */
+      slot->is_eos = FALSE;
+      BUFFERING_UNLOCK (urisrc);
       res = TRUE;
       urisrc->pending_pads =
           g_list_remove (urisrc->pending_pads, out_info->demux_src_pad);
@@ -1093,8 +1123,14 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
         goto done;
       }
 
+      BUFFERING_LOCK (urisrc);
       /* Mark that we fed an EOS to this slot */
       child_info->output_slot->is_eos = TRUE;
+      BUFFERING_UNLOCK (urisrc);
+
+      /* EOS means this element is no longer buffering */
+      remove_buffering_msgs (urisrc,
+          GST_OBJECT_CAST (child_info->output_slot->queue));
 
       /* Actually feed a custom EOS event to avoid marking pads as EOSed */
       s = gst_structure_new_empty ("urisourcebin-custom-eos");
@@ -1112,7 +1148,9 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
       break;
     case GST_EVENT_STREAM_START:
     case GST_EVENT_FLUSH_STOP:
+      BUFFERING_LOCK (urisrc);
       child_info->output_slot->is_eos = FALSE;
+      BUFFERING_UNLOCK (urisrc);
       break;
     default:
       break;
@@ -1370,17 +1408,23 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
     GstEvent *event;
     OutputSlotInfo *slot;
 
-    if (!info->output_slot->is_eos && urisrc->pending_pads &&
-        link_pending_pad_to_output (urisrc, info->output_slot)) {
+    slot = info->output_slot;
+
+    if (!slot->is_eos && urisrc->pending_pads &&
+        link_pending_pad_to_output (urisrc, slot)) {
       /* Found a new source pad to give this slot data - no need to send EOS */
       GST_URI_SOURCE_BIN_UNLOCK (urisrc);
       return;
     }
 
-    /* Unlink this pad from its output slot and send a fake EOS event to drain the
-     * queue */
-    slot = info->output_slot;
+    BUFFERING_LOCK (urisrc);
+    /* Unlink this pad from its output slot and send a fake EOS event
+     * to drain the queue */
     slot->is_eos = TRUE;
+    BUFFERING_UNLOCK (urisrc);
+
+    remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
+
     slot->linked_info = NULL;
 
     info->output_slot = NULL;
@@ -2378,7 +2422,7 @@ handle_redirect_message (GstURISourceBin * dec, GstMessage * msg)
   return new_msg;
 }
 
-static GstMessage *
+static void
 handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
 {
   gint perc, msg_perc;
@@ -2386,6 +2430,7 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
   GstMessage *smaller = NULL;
   GList *found = NULL;
   GList *iter;
+  OutputSlotInfo *slot;
 
   /* buffering messages must be aggregated as there might be multiple
    * multiqueue in the pipeline and their independent buffering messages
@@ -2405,7 +2450,22 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
   GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
       " with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
 
-  GST_OBJECT_LOCK (urisrc);
+  slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
+      "urisourcebin.slotinfo");
+
+  BUFFERING_LOCK (urisrc);
+  if (slot && slot->is_eos) {
+    /* Ignore buffering messages from queues we marked as EOS,
+     * we already removed those from the list of buffering
+     * objects */
+    BUFFERING_UNLOCK (urisrc);
+    gst_message_replace (&msg, NULL);
+    return;
+  }
+
+
+  g_mutex_lock (&urisrc->buffering_post_lock);
+
   /*
    * Single loop for 2 things:
    * 1) Look for a message with the same source
@@ -2414,11 +2474,10 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
    */
   for (iter = urisrc->buffering_status; iter;) {
     GstMessage *bufstats = iter->data;
-    OutputSlotInfo *slot =
-        g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
-        "urisourcebin.slotinfo");
     gboolean is_eos = FALSE;
 
+    slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
+        "urisourcebin.slotinfo");
     if (slot)
       is_eos = slot->is_eos;
 
@@ -2477,16 +2536,17 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
       gst_message_replace (&msg, smaller);
     }
   }
-  GST_OBJECT_UNLOCK (urisrc);
+  BUFFERING_UNLOCK (urisrc);
 
   if (msg) {
     GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
         " with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
+    GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
   } else {
     GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
         smaller_perc);
   }
-  return msg;
+  g_mutex_unlock (&urisrc->buffering_post_lock);
 }
 
 /* Remove any buffering message from the given source */
@@ -2494,8 +2554,14 @@ static void
 remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
 {
   GList *iter;
+  gboolean removed = FALSE, post;
+
+  BUFFERING_LOCK (urisrc);
+  g_mutex_lock (&urisrc->buffering_post_lock);
+
+  GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
+      " buffering messages", src);
 
-  GST_OBJECT_LOCK (urisrc);
   for (iter = urisrc->buffering_status; iter;) {
     GstMessage *bufstats = iter->data;
     if (GST_MESSAGE_SRC (bufstats) == src) {
@@ -2506,7 +2572,19 @@ remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
     }
     iter = g_list_next (iter);
   }
-  GST_OBJECT_UNLOCK (urisrc);
+
+  post = (removed && urisrc->buffering_status == NULL);
+  BUFFERING_UNLOCK (urisrc);
+
+  if (post) {
+    GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
+
+    /* removed the last buffering element, post 100% */
+    gst_element_post_message (GST_ELEMENT_CAST (urisrc),
+        gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
+  }
+
+  g_mutex_unlock (&urisrc->buffering_post_lock);
 }
 
 static void
@@ -2526,7 +2604,8 @@ handle_message (GstBin * bin, GstMessage * msg)
       break;
     }
     case GST_MESSAGE_BUFFERING:
-      msg = handle_buffering_message (urisrc, msg);
+      handle_buffering_message (urisrc, msg);
+      msg = NULL;
       break;
     default:
       break;