rtpsession: use proper locking for pads and caps
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 31 Aug 2009 14:34:14 +0000 (16:34 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 31 Aug 2009 14:38:27 +0000 (16:38 +0200)
Use the sesion lock and shotdown variable to protect and ref the pads we are
going to push on.

fixes #561825

gst/rtpmanager/gstrtpsession.c

index 14bc993..ac19d08 100644 (file)
@@ -25,7 +25,7 @@
  * session. This session can be used to send and receive RTP and RTCP packets.
  * Based on what REQUEST pads are requested from the session manager, specific
  * functionality can be activated.
- * 
+ *
  * The session manager currently implements RFC 3550 including:
  * <itemizedlist>
  *   <listitem>
  *     <para>Scheduling of RR/SR RTCP packets.</para>
  *   </listitem>
  * </itemizedlist>
- * 
+ *
  * The gstrtpsession will not demux packets based on SSRC or payload type, nor will
  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
  * combines all these features in one element.
- * 
+ *
  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
  * will be processed in the session and after being validated forwarded on the
  * recv_rtp_src pad.
- * 
+ *
  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
  * which will automatically create a sync_src pad. Packets received on the RTCP
  * pad will be used by the session manager to update the stats and database of
  * the other participants. SR packets will be forwarded on the sync_src pad
  * so that they can be used to perform inter-stream synchronisation when needed.
- * 
+ *
  * If you want the session manager to generate and send RTCP packets, request
  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
  * that should be sent to all participants in the session.
- * 
+ *
  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
  * automatically create a send_rtp_src pad. The session manager will modify the
  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
  * send_rtp_src pad after updating its internal state.
- * 
+ *
  * The session manager needs the clock-rate of the payload types it is handling
  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
  * signal.
- * 
+ *
  * <refsect2>
  * <title>Example pipelines</title>
  * |[
@@ -426,7 +426,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-new-ssrc:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of a new SSRC that entered @session.
    */
@@ -437,7 +437,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-ssrc_collision:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify when we have an SSRC collision
    */
@@ -449,7 +449,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-ssrc_validated:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of a new SSRC that became validated.
    */
@@ -485,7 +485,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-bye-ssrc:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of an SSRC that became inactive because of a BYE packet.
    */
@@ -496,7 +496,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-bye-timeout:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of an SSRC that has timed out because of BYE
    */
@@ -507,7 +507,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-timeout:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of an SSRC that has timed out
    */
@@ -518,7 +518,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
   /**
    * GstRtpSession::on-sender-timeout:
    * @sess: the object which received the signal
-   * @ssrc: the SSRC 
+   * @ssrc: the SSRC
    *
    * Notify of a sender SSRC that has timed out and became a receiver
    */
@@ -949,13 +949,20 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
   GstFlowReturn result;
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
+  GstPad *rtp_src;
 
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
-  if (rtpsession->recv_rtp_src) {
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if ((rtp_src = rtpsession->recv_rtp_src))
+    gst_object_ref (rtp_src);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (rtp_src) {
     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
-    result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+    result = gst_pad_push (rtp_src, buffer);
+    gst_object_unref (rtp_src);
   } else {
     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
     gst_buffer_unref (buffer);
@@ -973,19 +980,25 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
   GstFlowReturn result;
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
+  GstPad *rtp_src;
 
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
-  if (rtpsession->send_rtp_src) {
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if ((rtp_src = rtpsession->send_rtp_src))
+    gst_object_ref (rtp_src);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (rtp_src) {
     if (GST_IS_BUFFER (data)) {
       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
-      result = gst_pad_push (rtpsession->send_rtp_src, GST_BUFFER_CAST (data));
+      result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
     } else {
       GST_LOG_OBJECT (rtpsession, "sending RTP list");
-      result = gst_pad_push_list (rtpsession->send_rtp_src,
-          GST_BUFFER_LIST_CAST (data));
+      result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
     }
+    gst_object_unref (rtp_src);
   } else {
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     result = GST_FLOW_OK;
@@ -1003,37 +1016,55 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
   GstFlowReturn result;
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
+  GstPad *rtcp_src;
 
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
-  if (rtpsession->send_rtcp_src) {
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if (rtpsession->priv->stop_thread)
+    goto stopping;
+
+  if ((rtcp_src = rtpsession->send_rtcp_src)) {
     GstCaps *caps;
 
     /* set rtcp caps on output pad */
-    caps = GST_PAD_CAPS (rtpsession->send_rtcp_src);
-    if (!caps) {
+    if ((caps = GST_PAD_CAPS (rtcp_src))) {
       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
-      gst_pad_set_caps (rtpsession->send_rtcp_src, caps);
-    } else {
-      gst_caps_ref (caps);
+      gst_pad_set_caps (rtcp_src, caps);
+      gst_caps_unref (caps);
     }
     gst_buffer_set_caps (buffer, caps);
-    gst_caps_unref (caps);
     GST_LOG_OBJECT (rtpsession, "sending RTCP");
-    result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
+
+    gst_object_ref (rtcp_src);
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+
+    result = gst_pad_push (rtcp_src, buffer);
 
     /* we have to send EOS after this packet */
     if (eos) {
       GST_LOG_OBJECT (rtpsession, "sending EOS");
-      gst_pad_push_event (rtpsession->send_rtcp_src, gst_event_new_eos ());
+      gst_pad_push_event (rtcp_src, gst_event_new_eos ());
     }
+    gst_object_unref (rtcp_src);
   } else {
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+
     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
     gst_buffer_unref (buffer);
     result = GST_FLOW_OK;
   }
   return result;
+
+  /* ERRORS */
+stopping:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "we are stopping");
+    gst_buffer_unref (buffer);
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+    return GST_FLOW_OK;
+  }
 }
 
 /* called when the session manager has an SR RTCP packet ready for handling
@@ -1045,28 +1076,48 @@ gst_rtp_session_sync_rtcp (RTPSession * sess,
   GstFlowReturn result;
   GstRtpSession *rtpsession;
   GstRtpSessionPrivate *priv;
+  GstPad *sync_src;
 
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
-  if (rtpsession->sync_src) {
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if (rtpsession->priv->stop_thread)
+    goto stopping;
+
+  if ((sync_src = rtpsession->sync_src)) {
     GstCaps *caps;
 
     /* set rtcp caps on output pad */
-    if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) {
+    if (!(caps = GST_PAD_CAPS (sync_src))) {
       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
-      gst_pad_set_caps (rtpsession->sync_src, caps);
+      gst_pad_set_caps (sync_src, caps);
       gst_caps_unref (caps);
     }
     gst_buffer_set_caps (buffer, caps);
+    gst_object_ref (sync_src);
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+
     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
-    result = gst_pad_push (rtpsession->sync_src, buffer);
+    result = gst_pad_push (sync_src, buffer);
+    gst_object_unref (sync_src);
   } else {
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+
     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
     gst_buffer_unref (buffer);
     result = GST_FLOW_OK;
   }
   return result;
+
+  /* ERRORS */
+stopping:
+  {
+    GST_DEBUG_OBJECT (rtpsession, "we are stopping");
+    gst_buffer_unref (buffer);
+    GST_RTP_SESSION_UNLOCK (rtpsession);
+    return GST_FLOW_OK;
+  }
 }
 
 static void