gst/rtpmanager/gstrtpjitterbuffer.c: Only peek at the tail element instead of popping...
authorWim Taymans <wim.taymans@gmail.com>
Fri, 5 Oct 2007 12:07:37 +0000 (12:07 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:31 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop):
Only peek at the tail element instead of popping it off, which allows
us to greatly simplify things when the tail element changes.
* gst/rtpmanager/gstrtpsession.c:
(gst_rtp_session_event_recv_rtp_sink):
* gst/rtpmanager/gstrtpssrcdemux.c:
(gst_rtp_ssrc_demux_sink_event):
Forward FLUSH events instead of leaking them.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew),
(calculate_skew), (rtp_jitter_buffer_insert):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove the tail-changed callback in favour of a simple boolean when we
insert a buffer in the queue.
Add method to peek the tail of the buffer.

gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpssrcdemux.c
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h

index e087a7f..84e0a6d 100644 (file)
@@ -135,6 +135,7 @@ struct _GstRtpJitterBufferPrivate
   RTPJitterBuffer *jbuf;
   GMutex *jbuf_lock;
   GCond *jbuf_cond;
+  gboolean waiting;
 
   /* properties */
   guint latency_ms;
@@ -798,6 +799,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   GstFlowReturn ret = GST_FLOW_OK;
   GstClockTime timestamp;
   guint64 latency_ts;
+  gboolean tail;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
@@ -868,19 +870,19 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp))
+  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, &tail))
     goto duplicate;
 
-  /* signal addition of new buffer */
-  JBUF_SIGNAL (priv);
+  /* signal addition of new buffer when the _loop is waiting. */
+  if (priv->waiting)
+    JBUF_SIGNAL (priv);
 
   /* let's unschedule and unblock any waiting buffers. We only want to do this
-   * if there is a currently waiting newer (> seqnum) buffer  */
-  if (priv->clock_id) {
-    if (priv->waiting_seqnum > seqnum) {
-      gst_clock_id_unschedule (priv->clock_id);
-      GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
-    }
+   * when the tail buffer changed */
+  if (priv->clock_id && tail) {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "Unscheduling waiting buffer, new tail buffer");
+    gst_clock_id_unschedule (priv->clock_id);
   }
 
   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
@@ -963,7 +965,7 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
  *
  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
  * different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the rtp timestamp of buffer B.
+ * missing packet to arrive up to the timestamp of buffer B.
  */
 static void
 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
@@ -978,7 +980,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 
   JBUF_LOCK_CHECK (priv, flushing);
 again:
-  GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
+  GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
   while (TRUE) {
 
     /* always wait if we are blocked */
@@ -991,11 +993,17 @@ again:
         goto do_eos;
     }
     /* wait for packets or flushing now */
+    priv->waiting = TRUE;
     JBUF_WAIT_CHECK (priv, flushing);
+    priv->waiting = FALSE;
   }
 
-  /* pop a buffer, we must have a buffer now */
-  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+  /* peek a buffer, we're just looking at the timestamp and the sequence number.
+   * If all is fine, we'll pop and push it. If the sequence number is wrong we
+   * wait on the timestamp. In the chain function we will unlock the wait when a
+   * new buffer is available. The peeked buffer is valid for as long as we hold
+   * the jitterbuffer lock. */
+  outbuf = rtp_jitter_buffer_peek (priv->jbuf);
   seqnum = gst_rtp_buffer_get_seq (outbuf);
 
   /* get the timestamp, this is already corrected for clock skew by the
@@ -1003,7 +1011,7 @@ again:
   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
 
   GST_DEBUG_OBJECT (jitterbuffer,
-      "Popped buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
+      "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
       seqnum, GST_TIME_ARGS (timestamp),
       rtp_jitter_buffer_num_packets (priv->jbuf));
 
@@ -1082,20 +1090,15 @@ again:
     if (ret == GST_CLOCK_UNSCHEDULED) {
       GST_DEBUG_OBJECT (jitterbuffer,
           "Wait got unscheduled, will retry to push with new buffer");
-      /* reinsert popped buffer into queue, no need to recalculate skew, we do
-       * that when inserting the buffer in the chain function */
-      if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) {
-        GST_DEBUG_OBJECT (jitterbuffer,
-            "Duplicate packet #%d detected, dropping", seqnum);
-        priv->num_duplicates++;
-        gst_buffer_unref (outbuf);
-      }
       goto again;
     }
     /* Get new timestamp, latency might have changed */
     out_time = apply_offset (jitterbuffer, timestamp);
   }
 push_buffer:
+  /* when we get here we are ready to pop and push the buffer */
+  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+
   /* check if we are pushing something unexpected */
   if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
     gint dropped;
index 773f090..8bf7adc 100644 (file)
@@ -1020,6 +1020,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+      ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
     case GST_EVENT_NEWSEGMENT:
     {
index a5956d1..472f5f5 100644 (file)
@@ -344,7 +344,6 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
-      break;
     case GST_EVENT_NEWSEGMENT:
     default:
     {
index a002cc3..f3c2a2a 100644 (file)
@@ -100,16 +100,6 @@ rtp_jitter_buffer_new (void)
 }
 
 void
-rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer * jbuf, RTPTailChanged func,
-    gpointer user_data)
-{
-  g_return_if_fail (jbuf != NULL);
-
-  jbuf->tail_changed = func;
-  jbuf->user_data = user_data;
-}
-
-void
 rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate)
 {
   g_return_if_fail (jbuf != NULL);
@@ -374,6 +364,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
  * @jbuf: an #RTPJitterBuffer
  * @buf: a buffer
  * @time: a running_time when this buffer was received in nanoseconds
+ * @tail: TRUE when the tail element changed.
  *
  * Inserts @buf into the packet queue of @jbuf. The sequence number of the
  * packet will be used to sort the packets. This function takes ownerhip of
@@ -383,7 +374,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
  */
 gboolean
 rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
-    GstClockTime time)
+    GstClockTime time, gboolean * tail)
 {
   GList *list;
   gint func_ret = 1;
@@ -412,13 +403,12 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
 
   if (list)
     g_queue_insert_before (jbuf->packets, list, buf);
-  else {
+  else
     g_queue_push_tail (jbuf->packets, buf);
 
-    /* tail buffer changed, signal callback */
-    if (jbuf->tail_changed)
-      jbuf->tail_changed (jbuf, jbuf->user_data);
-  }
+  /* tail was changed when we did not find a previous packet */
+  if (tail)
+    *tail = (list == NULL);
 
   return TRUE;
 }
index d9903a1..3648e18 100644 (file)
@@ -67,9 +67,6 @@ struct _RTPJitterBuffer {
   gint64         window_min;
   gint64         skew;
   gint64         prev_send_diff;
-
-  RTPTailChanged tail_changed;
-  gpointer       user_data;
 };
 
 struct _RTPJitterBufferClass {
@@ -81,15 +78,14 @@ GType rtp_jitter_buffer_get_type (void);
 /* managing lifetime */
 RTPJitterBuffer*      rtp_jitter_buffer_new              (void);
 
-void                  rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer *jbuf, RTPTailChanged func, 
-                                                          gpointer user_data);
-
 void                  rtp_jitter_buffer_set_clock_rate   (RTPJitterBuffer *jbuf, gint clock_rate);
 gint                  rtp_jitter_buffer_get_clock_rate   (RTPJitterBuffer *jbuf);
 
 void                  rtp_jitter_buffer_reset_skew       (RTPJitterBuffer *jbuf);
 
-gboolean              rtp_jitter_buffer_insert           (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time);
+gboolean              rtp_jitter_buffer_insert           (RTPJitterBuffer *jbuf, GstBuffer *buf,
+                                                         GstClockTime time, gboolean *tail);
+GstBuffer *           rtp_jitter_buffer_peek             (RTPJitterBuffer *jbuf);
 GstBuffer *           rtp_jitter_buffer_pop              (RTPJitterBuffer *jbuf);
 
 void                  rtp_jitter_buffer_flush            (RTPJitterBuffer *jbuf);