gst/rtpmanager/gstrtpbin.c: Link to the right pads regardless of which one was create...
authorWim Taymans <wim.taymans@gmail.com>
Mon, 17 Sep 2007 02:01:41 +0000 (02:01 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:30 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (new_ssrc_pad_found):
Link to the right pads regardless of which one was created first in the
ssrc demuxer.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop):
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_process_rtp),
(gst_rtp_session_chain_recv_rtp), (gst_rtp_session_chain_send_rtp):
* gst/rtpmanager/rtpsource.c: (calculate_jitter):
Improve debugging.
* gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc),
(gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_finalize),
(gst_rtp_ssrc_demux_sink_event),
(gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain),
(gst_rtp_ssrc_demux_rtcp_chain),
(gst_rtp_ssrc_demux_internal_links):
* gst/rtpmanager/gstrtpssrcdemux.h:
Fix race in creating the RTP and RTCP pads when a new SSRC is detected.

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpssrcdemux.c
gst/rtpmanager/gstrtpssrcdemux.h
gst/rtpmanager/rtpsource.c

index cdbdaf6..035f4cc 100644 (file)
@@ -1404,8 +1404,11 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
 
   /* get pad and link */
   GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
+  padname = g_strdup_printf ("src_%d", ssrc);
+  srcpad = gst_element_get_pad (element, padname);
+  g_free (padname);
   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
-  gst_pad_link (pad, sinkpad);
+  gst_pad_link (srcpad, sinkpad);
   gst_object_unref (sinkpad);
 
   /* get the RTCP sync pad */
@@ -1434,7 +1437,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
 no_stream:
   {
     GST_RTP_SESSION_UNLOCK (session);
-    GST_DEBUG ("could not create stream");
+    GST_DEBUG_OBJECT (session->bin, "could not create stream");
     return;
   }
 }
index 57be42e..d27c2aa 100644 (file)
@@ -797,6 +797,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   guint16 seqnum;
   GstFlowReturn ret = GST_FLOW_OK;
   GstClockTime timestamp;
+  guint64 latency_ts;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
@@ -849,7 +850,6 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
    * latency is set, we just pump it in the queue and let the other end push it
    * out as fast as possible. */
   if (priv->latency_ms && priv->drop_on_latency) {
-    guint64 latency_ts;
 
     latency_ts =
         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
@@ -1053,8 +1053,8 @@ again:
     if (priv->next_seqnum != -1) {
       /* we expected next_seqnum but received something else, that's a gap */
       GST_WARNING_OBJECT (jitterbuffer,
-          "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
-          seqnum);
+          "Sequence number GAP detected: expected %d instead of %d",
+          priv->next_seqnum, seqnum);
     } else {
       /* we don't know what the next_seqnum should be, wait for the last
        * possible moment to push this buffer, maybe we get an earlier seqnum
index 6d4cf8b..83210ff 100644 (file)
@@ -781,11 +781,11 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
-  GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet");
-
   if (rtpsession->recv_rtp_src) {
+    GST_DEBUG_OBJECT (rtpsession, "pushing received RTP packet");
     result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
   } else {
+    GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
     gst_buffer_unref (buffer);
     result = GST_FLOW_OK;
   }
@@ -1114,10 +1114,22 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
   }
 
   ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
+  if (ret != GST_FLOW_OK)
+    goto push_error;
+
 
+done:
   gst_object_unref (rtpsession);
 
   return ret;
+
+  /* ERRORS */
+push_error:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "process returned %s",
+        gst_flow_get_name (ret));
+    goto done;
+  }
 }
 
 static GstFlowReturn
@@ -1286,10 +1298,21 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
   }
 
   ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
+  if (ret != GST_FLOW_OK)
+    goto push_error;
 
+done:
   gst_object_unref (rtpsession);
 
   return ret;
+
+  /* ERRORS */
+push_error:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "process returned %s",
+        gst_flow_get_name (ret));
+    goto done;
+  }
 }
 
 /* Create sinkpad to receive RTP packets from senders. This will also create a
index 2f1e9e6..a5956d1 100644 (file)
@@ -96,6 +96,9 @@ static GstElementDetails gst_rtp_ssrc_demux_details = {
   "Wim Taymans <wim@fluendo.com>"
 };
 
+#define GST_PAD_LOCK(obj)   (g_mutex_lock ((obj)->padlock))
+#define GST_PAD_UNLOCK(obj) (g_mutex_unlock ((obj)->padlock))
+
 /* signals */
 enum
 {
@@ -159,6 +162,7 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   return NULL;
 }
 
+/* with PAD_LOCK */
 static GstRtpSsrcDemuxPad *
 create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
     GstClockTime timestamp)
@@ -202,9 +206,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
 
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
 
-  /* unlock to perform the remainder and to fire our signal */
-  GST_OBJECT_UNLOCK (demux);
-
   /* copy caps from input */
   gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink));
   gst_pad_use_fixed_caps (rtp_pad);
@@ -227,8 +228,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   g_signal_emit (G_OBJECT (demux),
       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
 
-  GST_OBJECT_LOCK (demux);
-
   return demuxpad;
 }
 
@@ -304,6 +303,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
       gst_rtp_ssrc_demux_rtcp_sink_event);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
 
+  demux->padlock = g_mutex_new ();
+
   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
 }
 
@@ -327,6 +328,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object)
   GstRtpSsrcDemux *demux;
 
   demux = GST_RTP_SSRC_DEMUX (object);
+  g_mutex_free (demux->padlock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -349,14 +351,14 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
       GSList *walk;
 
       res = TRUE;
-      GST_OBJECT_LOCK (demux);
+      GST_PAD_LOCK (demux);
       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
         gst_event_ref (event);
         res &= gst_pad_push_event (pad->rtp_pad, event);
       }
-      GST_OBJECT_UNLOCK (demux);
+      GST_PAD_UNLOCK (demux);
       gst_event_unref (event);
       break;
     }
@@ -381,13 +383,13 @@ gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
       GSList *walk;
 
       res = TRUE;
-      GST_OBJECT_LOCK (demux);
+      GST_PAD_LOCK (demux);
       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
         res &= gst_pad_push_event (pad->rtcp_pad, event);
       }
-      GST_OBJECT_UNLOCK (demux);
+      GST_PAD_UNLOCK (demux);
       break;
     }
   }
@@ -412,7 +414,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
 
   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
 
-  GST_OBJECT_LOCK (demux);
+  GST_PAD_LOCK (demux);
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
     if (!(dpad =
@@ -420,7 +422,7 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
                 GST_BUFFER_TIMESTAMP (buf))))
       goto create_failed;
   }
-  GST_OBJECT_UNLOCK (demux);
+  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (dpad->rtp_pad, buf);
@@ -440,7 +442,7 @@ create_failed:
   {
     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
         ("Could not create new pad"));
-    GST_OBJECT_UNLOCK (demux);
+    GST_PAD_UNLOCK (demux);
     gst_buffer_unref (buf);
     return GST_FLOW_ERROR;
   }
@@ -479,14 +481,14 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
 
   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
 
-  GST_OBJECT_LOCK (demux);
+  GST_PAD_LOCK (demux);
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
     GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
     if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
       goto create_failed;
   }
-  GST_OBJECT_UNLOCK (demux);
+  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (dpad->rtcp_pad, buf);
@@ -506,7 +508,7 @@ create_failed:
   {
     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
         ("Could not create new pad"));
-    GST_OBJECT_UNLOCK (demux);
+    GST_PAD_UNLOCK (demux);
     gst_buffer_unref (buf);
     return GST_FLOW_ERROR;
   }
@@ -539,7 +541,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad)
 
   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
 
-  GST_OBJECT_LOCK (demux);
+  GST_PAD_LOCK (demux);
   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
 
@@ -555,7 +557,7 @@ gst_rtp_ssrc_demux_internal_links (GstPad * pad)
       break;
     }
   }
-  GST_OBJECT_UNLOCK (demux);
+  GST_PAD_UNLOCK (demux);
 
   gst_object_unref (demux);
   return res;
index 88aeed8..10c569d 100644 (file)
@@ -40,6 +40,8 @@ struct _GstRtpSsrcDemux
 
   GstPad *rtp_sink;
   GstPad *rtcp_sink;
+
+  GMutex *padlock;
   GSList *srcpads;
 };
 
index 4ffc6bb..1d7eb24 100644 (file)
@@ -284,6 +284,8 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
 
   pt = gst_rtp_buffer_get_payload_type (buffer);
 
+  GST_DEBUG ("SSRC %08x got payload %d", src->ssrc, pt);
+
   /* get clockrate */
   if ((clock_rate = get_clock_rate (src, pt)) == -1)
     goto no_clock_rate;