rtpjitterbuffer: serialize events in the buffer
authorWim Taymans <wtaymans@redhat.com>
Tue, 10 Dec 2013 10:57:37 +0000 (11:57 +0100)
committerWim Taymans <wtaymans@redhat.com>
Tue, 10 Dec 2013 10:57:37 +0000 (11:57 +0100)
Serialize events into the jitterbuffer by inserting them with a -1
seqnum.
Update unit test to expect events from the streaming thread.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=652986

gst/rtpmanager/gstrtpjitterbuffer.c
tests/check/elements/rtpjitterbuffer.c

index 1500120..464bb09 100644 (file)
@@ -730,6 +730,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
 
 #define ITEM_TYPE_BUFFER        0
 #define ITEM_TYPE_LOST          1
+#define ITEM_TYPE_EVENT         2
 
 static RTPJitterBufferItem *
 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
@@ -1303,18 +1304,13 @@ gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
   return ret;
 }
 
+/* handles and stores the event in the jitterbuffer, must be called with
+ * LOCK */
 static gboolean
-gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event)
+queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
 {
-  gboolean ret = TRUE;
-  GstRtpJitterBuffer *jitterbuffer;
-  GstRtpJitterBufferPrivate *priv;
-
-  jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
-  priv = jitterbuffer->priv;
-
-  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+  RTPJitterBufferItem *item;
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_CAPS:
@@ -1322,20 +1318,12 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
       GstCaps *caps;
 
       gst_event_parse_caps (event, &caps);
+      if (!gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps))
+        goto wrong_caps;
 
-      JBUF_LOCK (priv);
-      ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
-      JBUF_UNLOCK (priv);
-
-      /* set same caps on srcpad on success */
-      if (ret)
-        ret = gst_pad_push_event (priv->srcpad, event);
-      else
-        gst_event_unref (event);
       break;
     }
     case GST_EVENT_SEGMENT:
-    {
       gst_event_copy_segment (event, &priv->segment);
 
       /* we need time for now */
@@ -1344,11 +1332,51 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
 
       GST_DEBUG_OBJECT (jitterbuffer,
           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
-
-      /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
-      ret = gst_pad_push_event (priv->srcpad, event);
       break;
-    }
+    case GST_EVENT_EOS:
+      priv->eos = TRUE;
+      break;
+    default:
+      break;
+  }
+
+
+  GST_DEBUG_OBJECT (jitterbuffer, "adding event");
+  item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
+  rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
+  JBUF_SIGNAL_EVENT (priv);
+
+  return TRUE;
+
+  /* ERRORS */
+wrong_caps:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "received invalid caps");
+    gst_event_unref (event);
+    return FALSE;
+  }
+newseg_wrong_format:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
+    gst_event_unref (event);
+    return FALSE;
+  }
+}
+
+static gboolean
+gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
+{
+  gboolean ret = TRUE;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
+  switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       ret = gst_pad_push_event (priv->srcpad, event);
       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
@@ -1361,43 +1389,49 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
           GST_PAD_MODE_PUSH, TRUE);
       break;
-    case GST_EVENT_EOS:
-    {
-      /* push EOS in queue. We always push it at the head */
-      JBUF_LOCK (priv);
-      /* check for flushing, we need to discard the event and return FALSE when
-       * we are flushing */
-      ret = priv->srcresult == GST_FLOW_OK;
-      if (ret && !priv->eos) {
-        GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
-        priv->eos = TRUE;
-        JBUF_SIGNAL_EVENT (priv);
-      } else if (priv->eos) {
-        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
+    default:
+      if (GST_EVENT_IS_SERIALIZED (event)) {
+        /* serialized events go in the queue */
+        JBUF_LOCK (priv);
+        if (priv->srcresult != GST_FLOW_OK) {
+          /* Errors in sticky event pushing are no problem and ignored here
+           * as they will cause more meaningful errors during data flow.
+           * For EOS events, that are not followed by data flow, we still
+           * return FALSE here though.
+           */
+          if (!GST_EVENT_IS_STICKY (event) ||
+              GST_EVENT_TYPE (event) == GST_EVENT_EOS)
+            goto out_flow_error;
+        }
+        /* refuse more events on EOS */
+        if (priv->eos)
+          goto out_eos;
+        ret = queue_event (jitterbuffer, event);
+        JBUF_UNLOCK (priv);
       } else {
-        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
-            gst_flow_get_name (priv->srcresult));
+        /* non-serialized events are forwarded downstream immediately */
+        ret = gst_pad_push_event (priv->srcpad, event);
       }
-      JBUF_UNLOCK (priv);
-      gst_event_unref (event);
-      break;
-    }
-    default:
-      ret = gst_pad_push_event (priv->srcpad, event);
       break;
   }
-
-done:
-
   return ret;
 
   /* ERRORS */
-newseg_wrong_format:
+out_flow_error:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
-    ret = FALSE;
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "refusing event, we have a downstream flow error: %s",
+        gst_flow_get_name (priv->srcresult));
+    JBUF_UNLOCK (priv);
     gst_event_unref (event);
-    goto done;
+    return FALSE;
+  }
+out_eos:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
+    JBUF_UNLOCK (priv);
+    gst_event_unref (event);
+    return FALSE;
   }
 }
 
index edda166..2169718 100644 (file)
@@ -536,8 +536,12 @@ setup_testharness (TestData * data)
   gst_pad_set_caps (data->test_src_pad, generate_caps ());
   gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
 
-  while ((obj = g_async_queue_try_pop (data->sink_event_queue)))
-    gst_mini_object_unref (obj);
+  obj = g_async_queue_pop (data->sink_event_queue);
+  gst_mini_object_unref (obj);
+  obj = g_async_queue_pop (data->sink_event_queue);
+  gst_mini_object_unref (obj);
+  obj = g_async_queue_pop (data->sink_event_queue);
+  gst_mini_object_unref (obj);
 }
 
 static void