rtpjitterbuffer: add support for serialized queries
authorWim Taymans <wtaymans@redhat.com>
Fri, 14 Feb 2014 14:59:46 +0000 (15:59 +0100)
committerWim Taymans <wtaymans@redhat.com>
Fri, 14 Feb 2014 14:59:46 +0000 (15:59 +0100)
See https://bugzilla.gnome.org/show_bug.cgi?id=723850

gst/rtpmanager/gstrtpjitterbuffer.c

index 6cade80..0a91601 100644 (file)
@@ -195,6 +195,24 @@ enum
   }                                                      \
 } G_STMT_END
 
+#define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
+  GST_DEBUG ("waiting query");                           \
+  (priv)->waiting_query = TRUE;                          \
+  g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
+  (priv)->waiting_query = FALSE;                         \
+  GST_DEBUG ("waiting query done");                      \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
+    goto label;                                          \
+} G_STMT_END
+#define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
+  (priv)->last_query = res;                              \
+  if (G_UNLIKELY ((priv)->waiting_query)) {              \
+    GST_DEBUG ("signal query");                          \
+    g_cond_signal (&(priv)->jbuf_query);                 \
+  }                                                      \
+} G_STMT_END
+
+
 struct _GstRtpJitterBufferPrivate
 {
   GstPad *sinkpad, *srcpad;
@@ -206,6 +224,9 @@ struct _GstRtpJitterBufferPrivate
   GCond jbuf_timer;
   gboolean waiting_event;
   GCond jbuf_event;
+  gboolean waiting_query;
+  GCond jbuf_query;
+  gboolean last_query;
   gboolean discont;
   gboolean ts_discont;
   gboolean active;
@@ -704,6 +725,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   g_mutex_init (&priv->jbuf_lock);
   g_cond_init (&priv->jbuf_timer);
   g_cond_init (&priv->jbuf_event);
+  g_cond_init (&priv->jbuf_query);
 
   /* reset skew detection initialy */
   rtp_jitter_buffer_reset_skew (priv->jbuf);
@@ -739,9 +761,12 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
 }
 
+#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
+
 #define ITEM_TYPE_BUFFER        0
 #define ITEM_TYPE_LOST          1
 #define ITEM_TYPE_EVENT         2
+#define ITEM_TYPE_QUERY         3
 
 static RTPJitterBufferItem *
 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
@@ -784,6 +809,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   g_mutex_clear (&priv->jbuf_lock);
   g_cond_clear (&priv->jbuf_timer);
   g_cond_clear (&priv->jbuf_event);
+  g_cond_clear (&priv->jbuf_query);
 
   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
   g_object_unref (priv->jbuf);
@@ -1124,6 +1150,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
   /* this unblocks any waiting pops on the src pad task */
   JBUF_SIGNAL_EVENT (priv);
+  JBUF_SIGNAL_QUERY (priv, FALSE);
   JBUF_UNLOCK (priv);
 }
 
@@ -1261,6 +1288,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       priv->timer_running = FALSE;
       unschedule_current_timer (jitterbuffer);
       JBUF_SIGNAL_TIMER (priv);
+      JBUF_SIGNAL_QUERY (priv, FALSE);
       JBUF_UNLOCK (priv);
       g_thread_join (priv->timer_thread);
       priv->timer_thread = NULL;
@@ -2224,8 +2252,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
 
       old_item = rtp_jitter_buffer_peek (priv->jbuf);
 
-      /* only drop non-event buffers */
-      if (old_item->type != ITEM_TYPE_EVENT) {
+      if (IS_DROPABLE (old_item)) {
         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
             old_item);
@@ -2404,53 +2431,62 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   GstFlowReturn result;
   RTPJitterBufferItem *item;
-  GstBuffer *outbuf;
-  GstEvent *outevent;
+  GstBuffer *outbuf = NULL;
+  GstEvent *outevent = NULL;
+  GstQuery *outquery = NULL;
   GstClockTime dts, pts;
   gint percent = -1;
-  gboolean is_buffer, do_push = TRUE;
+  gboolean do_push = TRUE;
+  guint type;
 
   /* when we get here we are ready to pop and push the buffer */
   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+  type = item->type;
 
-  is_buffer = GST_IS_BUFFER (item->data);
-
-  if (is_buffer) {
-    check_buffering_percent (jitterbuffer, &percent);
+  switch (type) {
+    case ITEM_TYPE_BUFFER:
+      check_buffering_percent (jitterbuffer, &percent);
 
-    /* we need to make writable to change the flags and timestamps */
-    outbuf = gst_buffer_make_writable (item->data);
+      /* we need to make writable to change the flags and timestamps */
+      outbuf = gst_buffer_make_writable (item->data);
 
-    if (G_UNLIKELY (priv->discont)) {
-      /* set DISCONT flag when we missed a packet. We pushed the buffer writable
-       * into the jitterbuffer so we can modify now. */
-      GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
-      GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
-      priv->discont = FALSE;
-    }
-    if (G_UNLIKELY (priv->ts_discont)) {
-      GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
-      priv->ts_discont = FALSE;
-    }
+      if (G_UNLIKELY (priv->discont)) {
+        /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+         * into the jitterbuffer so we can modify now. */
+        GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
+        GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+        priv->discont = FALSE;
+      }
+      if (G_UNLIKELY (priv->ts_discont)) {
+        GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
+        priv->ts_discont = FALSE;
+      }
 
-    dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
-    pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
+      dts =
+          gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
+      pts =
+          gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
 
-    /* apply timestamp with offset to buffer now */
-    GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
-    GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
+      /* apply timestamp with offset to buffer now */
+      GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
+      GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
 
-    /* update the elapsed time when we need to check against the npt stop time. */
-    update_estimated_eos (jitterbuffer, item);
+      /* update the elapsed time when we need to check against the npt stop time. */
+      update_estimated_eos (jitterbuffer, item);
 
-    priv->last_out_time = GST_BUFFER_PTS (outbuf);
-  } else {
-    outevent = item->data;
-    if (item->type == ITEM_TYPE_LOST) {
+      priv->last_out_time = GST_BUFFER_PTS (outbuf);
+      break;
+    case ITEM_TYPE_LOST:
       priv->discont = TRUE;
       if (!priv->do_lost)
         do_push = FALSE;
-    }
+      /* FALLTHROUGH */
+    case ITEM_TYPE_EVENT:
+      outevent = item->data;
+      break;
+    case ITEM_TYPE_QUERY:
+      outquery = item->data;
+      break;
   }
 
   /* now we are ready to push the buffer. Save the seqnum and release the lock
@@ -2464,28 +2500,45 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
   item->data = NULL;
   free_item (item);
 
-  if (is_buffer) {
-    /* push buffer */
-    if (percent != -1)
-      post_buffering_percent (jitterbuffer, percent);
+  switch (type) {
+    case ITEM_TYPE_BUFFER:
+      /* push buffer */
+      if (percent != -1)
+        post_buffering_percent (jitterbuffer, percent);
 
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
-        seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
-        GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
-    result = gst_pad_push (priv->srcpad, outbuf);
-  } else {
-    GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
+          seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
+          GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
+      result = gst_pad_push (priv->srcpad, outbuf);
 
-    if (do_push)
-      gst_pad_push_event (priv->srcpad, outevent);
-    else
-      gst_event_unref (outevent);
+      JBUF_LOCK_CHECK (priv, out_flushing);
+      break;
+    case ITEM_TYPE_LOST:
+    case ITEM_TYPE_EVENT:
+      GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
 
-    result = GST_FLOW_OK;
-  }
-  JBUF_LOCK_CHECK (priv, out_flushing);
+      if (do_push)
+        gst_pad_push_event (priv->srcpad, outevent);
+      else
+        gst_event_unref (outevent);
+
+      result = GST_FLOW_OK;
 
+      JBUF_LOCK_CHECK (priv, out_flushing);
+      break;
+    case ITEM_TYPE_QUERY:
+    {
+      gboolean res;
+
+      res = gst_pad_peer_query (priv->srcpad, outquery);
+
+      JBUF_LOCK_CHECK (priv, out_flushing);
+      GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
+      JBUF_SIGNAL_QUERY (priv, res);
+      break;
+    }
+  }
   return result;
 
   /* ERRORS */
@@ -2904,7 +2957,7 @@ static void
 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 {
   GstRtpJitterBufferPrivate *priv;
-  GstFlowReturn result;
+  GstFlowReturn result = GST_FLOW_OK;
 
   priv = jitterbuffer->priv;
 
@@ -2920,8 +2973,6 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   while (result == GST_FLOW_OK);
   /* store result for upstream */
   priv->srcresult = result;
-  JBUF_UNLOCK (priv);
-
   /* if we get here we need to pause */
   goto pause;
 
@@ -2929,15 +2980,17 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 flushing:
   {
     result = priv->srcresult;
-    JBUF_UNLOCK (priv);
     goto pause;
   }
 pause:
   {
-    const gchar *reason = gst_flow_get_name (result);
     GstEvent *event;
 
-    GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
+    JBUF_SIGNAL_QUERY (priv, FALSE);
+    JBUF_UNLOCK (priv);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
+        gst_flow_get_name (result));
     gst_pad_pause_task (priv->srcpad);
     if (result == GST_FLOW_EOS) {
       event = gst_event_new_eos ();
@@ -3113,6 +3166,11 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
     GstQuery * query)
 {
   gboolean res = FALSE;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
+  priv = jitterbuffer->priv;
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_CAPS:
@@ -3128,14 +3186,30 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
     }
     default:
       if (GST_QUERY_IS_SERIALIZED (query)) {
-        GST_WARNING_OBJECT (pad, "unhandled serialized query");
-        res = FALSE;
+        RTPJitterBufferItem *item;
+
+        JBUF_LOCK_CHECK (priv, out_flushing);
+        GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
+        item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
+        rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
+        JBUF_SIGNAL_EVENT (priv);
+        JBUF_WAIT_QUERY (priv, out_flushing);
+        res = priv->last_query;
+        JBUF_UNLOCK (priv);
       } else {
         res = gst_pad_query_default (pad, parent, query);
       }
       break;
   }
   return res;
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+    JBUF_UNLOCK (priv);
+    return FALSE;
+  }
+
 }
 
 static gboolean