gst/rtpmanager/gstrtpjitterbuffer.c: Fix EOS handling.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 16 Aug 2007 11:40:16 +0000 (11:40 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:28 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain),
(gst_rtp_jitter_buffer_loop):
Fix EOS handling.
Convert some DEBUG into WARNINGs.
Pause task when flushing.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
(rtcp_thread), (gst_rtp_session_event_recv_rtcp_sink):
Use system clock for RTCP session management timeouts.
* gst/rtpmanager/rtpsession.c: (on_new_ssrc), (on_ssrc_collision),
(on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout):
Release the session lock when emiting signals.

gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/rtpsession.c

index fe85f87..e66613b 100644 (file)
@@ -696,16 +696,18 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
       /* check for flushing, we need to discard the event and return FALSE when
        * we are flushing */
       ret = priv->srcresult == GST_FLOW_OK;
-      if (ret) {
+      if (ret && !priv->eos) {
         GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
         priv->eos = TRUE;
         JBUF_SIGNAL (priv);
+      } else if (priv->eos) {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
       } else {
         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
             gst_flow_get_name (priv->srcresult));
-        gst_event_unref (event);
       }
       JBUF_UNLOCK (priv);
+      gst_event_unref (event);
       break;
     }
     default:
@@ -863,7 +865,7 @@ invalid_buffer:
   }
 not_negotiated:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+    GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
     gst_buffer_unref (buffer);
     gst_object_unref (jitterbuffer);
     return GST_FLOW_NOT_NEGOTIATED;
@@ -878,13 +880,13 @@ out_flushing:
 have_eos:
   {
     ret = GST_FLOW_UNEXPECTED;
-    GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
+    GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
     gst_buffer_unref (buffer);
     goto finished;
   }
 too_late:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
+    GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
         " popped, dropping", seqnum, priv->last_popped_seqnum);
     priv->num_late++;
     gst_buffer_unref (buffer);
@@ -892,7 +894,7 @@ too_late:
   }
 duplicate:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
+    GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
         seqnum);
     priv->num_duplicates++;
     gst_buffer_unref (buffer);
@@ -923,13 +925,19 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
   JBUF_LOCK_CHECK (priv, flushing);
 again:
   GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
-  /* wait if we are blocked or don't have a packet and eos */
-  while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf)
-          || priv->eos)) {
+  while (TRUE) {
+
+    /* always wait if we are blocked */
+    if (!priv->blocked) {
+      /* if we have a packet, we can grab it */
+      if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
+        break;
+      /* no packets but we are EOS, do eos logic */
+      if (priv->eos)
+        goto do_eos;
+    }
     JBUF_WAIT_CHECK (priv, flushing);
   }
-  if (priv->eos)
-    goto do_eos;
 
   /* pop a buffer, we must have a buffer now */
   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
@@ -955,7 +963,7 @@ again:
 
     if (priv->next_seqnum != -1) {
       /* we expected next_seqnum but received something else, that's a gap */
-      GST_DEBUG_OBJECT (jitterbuffer,
+      GST_WARNING_OBJECT (jitterbuffer,
           "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
           seqnum);
     } else {
@@ -1092,6 +1100,7 @@ do_eos:
 flushing:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+    gst_pad_pause_task (priv->srcpad);
     if (outbuf)
       gst_buffer_unref (outbuf);
     JBUF_UNLOCK (priv);
index bb47a29..01153e2 100644 (file)
@@ -537,9 +537,10 @@ rtcp_thread (GstRTPSession * rtpsession)
   GstClockTime current_time;
   GstClockTime next_timeout;
 
-  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+  /* RTCP timeouts we use the system clock */
+  clock = gst_system_clock_obtain ();
   if (clock == NULL)
-    return;
+    goto no_clock;
 
   current_time = gst_clock_get_time (clock);
 
@@ -590,6 +591,15 @@ rtcp_thread (GstRTPSession * rtpsession)
   gst_object_unref (clock);
 
   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
+  return;
+
+  /* ERRORS */
+no_clock:
+  {
+    GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL),
+        ("Could not get system clock"));
+    return;
+  }
 }
 
 static gboolean
@@ -900,6 +910,10 @@ gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     default:
+      if (rtpsession->send_rtcp_src) {
+        gst_event_ref (event);
+        ret = gst_pad_push_event (rtpsession->send_rtcp_src, event);
+      }
       ret = gst_pad_push_event (rtpsession->sync_src, event);
       break;
   }
index 2b3bcb8..9ab3b4a 100644 (file)
@@ -251,39 +251,51 @@ rtp_session_get_property (GObject * object, guint prop_id,
 static void
 on_new_ssrc (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source);
+  RTP_SESSION_LOCK (sess);
 }
 
 static void
 on_ssrc_collision (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
       source);
+  RTP_SESSION_LOCK (sess);
 }
 
 static void
 on_ssrc_validated (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
       source);
+  RTP_SESSION_LOCK (sess);
 }
 
 static void
 on_bye_ssrc (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
+  RTP_SESSION_LOCK (sess);
 }
 
 static void
 on_bye_timeout (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
+  RTP_SESSION_LOCK (sess);
 }
 
 static void
 on_timeout (RTPSession * sess, RTPSource * source)
 {
+  RTP_SESSION_UNLOCK (sess);
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
+  RTP_SESSION_LOCK (sess);
 }
 
 /**