gst/rtpmanager/async_jitter_queue.c: Fix the case where the buffer underruns and...
authorWim Taymans <wim.taymans@gmail.com>
Mon, 30 Apr 2007 13:41:30 +0000 (13:41 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:26 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/async_jitter_queue.c: (signal_waiting_threads),
(async_jitter_queue_pop_intern_unlocked):
Fix the case where the buffer underruns and does not block.
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_base_init),
(create_recv_rtcp), (create_send_rtp), (create_rtcp),
(gst_rtp_bin_request_new_pad):
Rename RTCP send pad, like in the session manager.
Allow getting an RTCP pad for receiving even if we don't receive RTP.
fix handling of send_rtp_src pad.
* gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_chain):
When no pt map could be found, fall back to the sinkpad caps.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_process_rtp),
(gst_rtp_session_send_rtp), (create_recv_rtp_sink),
(create_recv_rtcp_sink), (create_send_rtp_sink),
(create_send_rtcp_src):
Fix pad names.
* gst/rtpmanager/rtpsession.c: (source_push_rtp),
(rtp_session_create_source), (rtp_session_process_sr),
(rtp_session_send_rtp), (session_start_rtcp):
* gst/rtpmanager/rtpsession.h:
Unlock session when performing a callback.
Add callbacks for the internal session object.
Fix sending of RTP packets.
first attempt at adding NTP times in the SR packets.
Small debug and doc improvements.
* gst/rtpmanager/rtpsource.c: (rtp_source_send_rtp):
Update stats for SR reports.

gst/rtpmanager/async_jitter_queue.c
gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpptdemux.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c

index 22a8ed0..81ba381 100644 (file)
@@ -100,6 +100,7 @@ signal_waiting_threads (AsyncJitterQueue * queue)
 {
   if (async_jitter_queue_length_ts_units_unlocked (queue) >=
       queue->high_threshold * queue->max_queue_length) {
+    GST_DEBUG ("stop buffering");
     queue->buffering = FALSE;
   }
 
@@ -473,6 +474,7 @@ async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
 {
   gpointer retval;
   GstBuffer *tail_buffer = NULL;
+  guint tsunits;
 
   if (queue->pop_flushing)
     return NULL;
@@ -485,20 +487,27 @@ async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
       return NULL;
   }
 
-  if (async_jitter_queue_length_ts_units_unlocked (queue) <=
-      queue->low_threshold * queue->max_queue_length
+
+  tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
+
+  GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
+      queue->low_threshold * queue->max_queue_length);
+
+  if (tsunits <= queue->low_threshold * queue->max_queue_length
       && queue->pops_remaining == 0) {
     if (!queue->buffering) {
+      GST_DEBUG ("start buffering");
       queue->buffering = TRUE;
       queue->pops_remaining = queue->queue->length;
-    } else {
-      while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
-        queue->waiting_threads++;
-        g_cond_wait (queue->cond, queue->mutex);
-        queue->waiting_threads--;
-        if (queue->pop_flushing)
-          return NULL;
-      }
+    }
+
+    GST_DEBUG ("wait for data");
+    while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
+      queue->waiting_threads++;
+      g_cond_wait (queue->cond, queue->mutex);
+      queue->waiting_threads--;
+      if (queue->pop_flushing)
+        return NULL;
     }
   }
 
index eb82739..927755f 100644 (file)
@@ -84,8 +84,8 @@ GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
-static GstStaticPadTemplate rtpbin_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
     GST_PAD_SRC,
     GST_PAD_REQUEST,
     GST_STATIC_CAPS ("application/x-rtcp")
@@ -195,7 +195,7 @@ struct _GstRTPBinSession
   GstPad *recv_rtcp_src;
   GstPad *send_rtp_sink;
   GstPad *send_rtp_src;
-  GstPad *rtcp_src;
+  GstPad *send_rtcp_src;
 };
 
 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
@@ -432,7 +432,7 @@ gst_rtp_bin_base_init (gpointer klass)
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
   gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&rtpbin_rtcp_src_template));
+      gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
 
@@ -795,10 +795,15 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ,
 
   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
 
-  /* get the session, it must exist or we error */
+  /* get or create the session */
   session = find_session_by_id (rtpbin, sessid);
-  if (!session)
-    goto no_session;
+  if (!session) {
+    GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
+    /* create session now */
+    session = create_session (rtpbin, sessid);
+    if (session == NULL)
+      goto create_error;
+  }
 
   /* check if pad was requested */
   if (session->recv_rtcp_sink != NULL)
@@ -841,9 +846,9 @@ no_name:
     g_warning ("rtpbin: invalid name given");
     return NULL;
   }
-no_session:
+create_error:
   {
-    g_warning ("rtpbin: no session with id %d", sessid);
+    /* create_session already warned */
     return NULL;
   }
 existed:
@@ -872,7 +877,7 @@ link_failed:
 static GstPad *
 create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
 {
-  GstPad *result, *srcpad, *srcghost;
+  GstPad *result, *srcghost;
   gchar *gname;
   guint sessid;
   GstRTPBinSession *session;
@@ -895,7 +900,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
   if (session->send_rtp_sink != NULL)
     goto existed;
 
-  /* get recv_rtp pad and store */
+  /* get send_rtp pad and store */
   session->send_rtp_sink =
       gst_element_get_request_pad (session->session, "send_rtp_sink");
   if (session->send_rtp_sink == NULL)
@@ -907,8 +912,9 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
 
   /* get srcpad */
-  srcpad = gst_element_get_pad (session->session, "send_rtp_src");
-  if (srcpad == NULL)
+  session->send_rtp_src =
+      gst_element_get_static_pad (session->session, "send_rtp_src");
+  if (session->send_rtp_src == NULL)
     goto no_srcpad;
 
   /* ghost the new source pad */
@@ -916,7 +922,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
   gname = g_strdup_printf ("send_rtp_src_%d", sessid);
   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
   srcghost =
-      gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ);
+      gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
   gst_pad_set_active (srcghost, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
   g_free (gname);
@@ -962,7 +968,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
   GstRTPBinSession *session;
 
   /* first get the session number */
-  if (name == NULL || sscanf (name, "rtcp_src_%d", &sessid) != 1)
+  if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1)
     goto no_name;
 
   /* get or create session */
@@ -971,16 +977,17 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
     goto no_session;
 
   /* check if pad was requested */
-  if (session->rtcp_src != NULL)
+  if (session->send_rtcp_src != NULL)
     goto existed;
 
   /* get rtcp_src pad and store */
-  session->rtcp_src =
+  session->send_rtcp_src =
       gst_element_get_request_pad (session->session, "send_rtcp_src");
-  if (session->rtcp_src == NULL)
+  if (session->send_rtcp_src == NULL)
     goto pad_failed;
 
-  result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ);
+  result =
+      gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
   gst_pad_set_active (result, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
 
@@ -999,7 +1006,8 @@ no_session:
   }
 existed:
   {
-    g_warning ("rtpbin: rtcp_src pad already requested for session %d", sessid);
+    g_warning ("rtpbin: send_rtcp_src pad already requested for session %d",
+        sessid);
     return NULL;
   }
 pad_failed:
@@ -1036,7 +1044,8 @@ gst_rtp_bin_request_new_pad (GstElement * element,
   } else if (templ == gst_element_class_get_pad_template (klass,
           "send_rtp_sink_%d")) {
     result = create_send_rtp (rtpbin, templ, name);
-  } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%d")) {
+  } else if (templ == gst_element_class_get_pad_template (klass,
+          "send_rtcp_src_%d")) {
     result = create_rtcp (rtpbin, templ, name);
   } else
     goto wrong_template;
index 247df14..40127f2 100644 (file)
@@ -258,6 +258,8 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
         &ret);
 
     caps = g_value_get_boxed (&ret);
+    if (caps == NULL)
+      caps = GST_PAD_CAPS (rtpdemux->sink);
     if (!caps)
       goto no_caps;
 
index 12a46fe..80f340a 100644 (file)
@@ -451,6 +451,8 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
+  GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet");
+
   if (rtpsession->recv_rtp_src) {
     result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
   } else {
@@ -473,6 +475,8 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
   rtpsession = GST_RTP_SESSION (user_data);
   priv = rtpsession->priv;
 
+  GST_DEBUG_OBJECT (rtpsession, "sending RTP packet");
+
   if (rtpsession->send_rtp_src) {
     result = gst_pad_push (rtpsession->send_rtp_src, buffer);
   } else {
@@ -737,7 +741,7 @@ create_recv_rtp_sink (GstRTPSession * rtpsession)
 
   rtpsession->recv_rtp_sink =
       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
-      NULL);
+      "recv_rtp_sink");
   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
       gst_rtp_session_chain_recv_rtp);
   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
@@ -766,7 +770,7 @@ create_recv_rtcp_sink (GstRTPSession * rtpsession)
 
   rtpsession->recv_rtcp_sink =
       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
-      NULL);
+      "recv_rtcp_sink");
   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
       gst_rtp_session_chain_recv_rtcp);
   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
@@ -795,18 +799,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
 
   rtpsession->send_rtp_sink =
       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
-      NULL);
+      "send_rtp_sink");
   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
       gst_rtp_session_chain_send_rtp);
   gst_pad_set_event_function (rtpsession->send_rtp_sink,
       gst_rtp_session_event_send_rtp_sink);
   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
-      rtpsession->recv_rtcp_sink);
+      rtpsession->send_rtp_sink);
 
   rtpsession->send_rtp_src =
       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
-      NULL);
+      "send_rtp_src");
   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
 
@@ -824,7 +828,7 @@ create_send_rtcp_src (GstRTPSession * rtpsession)
 
   rtpsession->send_rtcp_src =
       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
-      NULL);
+      "send_rtcp_src");
   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
       rtpsession->send_rtcp_src);
index af418ab..8a7f3e7 100644 (file)
@@ -622,6 +622,7 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
   if (source == session->source) {
     GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
 
+    RTP_SESSION_UNLOCK (session);
 
     if (session->callbacks.send_rtp)
       result =
@@ -629,8 +630,11 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
           session->user_data);
     else
       gst_buffer_unref (buffer);
+
   } else {
     GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
+    RTP_SESSION_UNLOCK (session);
+
     if (session->callbacks.process_rtp)
       result =
           session->callbacks.process_rtp (session, source, buffer,
@@ -638,6 +642,8 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session)
     else
       gst_buffer_unref (buffer);
   }
+  RTP_SESSION_LOCK (session);
+
   return result;
 }
 
@@ -877,6 +883,7 @@ rtp_session_create_source (RTPSession * sess)
   }
   source = rtp_source_new (ssrc);
   g_object_ref (source);
+  rtp_source_set_callbacks (source, &callbacks, sess);
   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
       source);
   /* we have one more source now */
@@ -1080,6 +1087,8 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
     gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
         &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
 
+    GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter);
+
     if (ssrc == sess->source->ssrc) {
       /* only deal with report blocks for our session, we update the stats of
        * the sender of the RTCP message. We could also compare our stats against
@@ -1361,7 +1370,8 @@ ignore:
  * @sess: an #RTPSession
  * @buffer: an RTP buffer
  *
- * Send the RTP buffer in the session manager.
+ * Send the RTP buffer in the session manager. This function takes ownership of
+ * @buffer.
  *
  * Returns: a #GstFlowReturn.
  */
@@ -1375,9 +1385,19 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
+  if (!gst_rtp_buffer_validate (buffer))
+    goto invalid_packet;
+
+  GST_DEBUG ("received RTP packet for sending");
+
   RTP_SESSION_LOCK (sess);
   source = sess->source;
 
+  /* update last activity */
+  if (sess->callbacks.get_time)
+    source->last_rtp_activity =
+        sess->callbacks.get_time (sess, sess->user_data);
+
   prevsender = RTP_SOURCE_IS_SENDER (source);
 
   /* we use our own source to send */
@@ -1388,6 +1408,14 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
   RTP_SESSION_UNLOCK (sess);
 
   return result;
+
+  /* ERRORS */
+invalid_packet:
+  {
+    gst_buffer_unref (buffer);
+    GST_DEBUG ("invalid RTP packet received");
+    return GST_FLOW_OK;
+  }
 }
 
 static GstClockTime
@@ -1534,13 +1562,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data)
   data->rtcp = gst_rtcp_buffer_new (sess->mtu);
 
   if (RTP_SOURCE_IS_SENDER (own)) {
+    guint64 ntptime;
+    guint32 rtptime;
+
     /* we are a sender, create SR */
     GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
     gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
 
-    /* fill in sender report info, FIXME NTP and RTP timestamps missing */
+    /* convert clock time to NTP time */
+    ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND);
+    ntptime += (2208988800LL << 32);
+
+    rtptime = 0;
+
+    /* fill in sender report info, FIXME RTP timestamps missing */
     gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
-        0, 0, own->stats.packets_sent, own->stats.octets_sent);
+        ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent);
   } else {
     /* we are only receiver, create RR */
     GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
index c9a2114..9082d9d 100644 (file)
@@ -110,7 +110,7 @@ typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data)
  * @sess: an #RTPSession
  * @user_data: user data specified when registering
  *
- * This callback will be called when @sess needs to cancel the previous timeout. 
+ * This callback will be called when @sess needs to cancel the current timeout. 
  * The currently running timeout should be canceled and a new reporting interval
  * should be requested from @sess.
  */
@@ -122,6 +122,7 @@ typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data);
  * @RTPSessionSendRTP: callback for sending RTP packets
  * @RTPSessionSendRTCP: callback for sending RTCP packets
  * @RTPSessionGetTime: callback for returning the current time
+ * @RTPSessionReconsider: callback for reconsidering the timeout
  *
  * These callbacks can be installed on the session manager to get notification
  * when RTP and RTCP packets are ready for further processing. These callbacks
index e1e6aac..7af7467 100644 (file)
@@ -453,18 +453,29 @@ GstFlowReturn
 rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
 {
   GstFlowReturn result = GST_FLOW_OK;
+  guint len;
 
   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
+  len = gst_rtp_buffer_get_payload_len (buffer);
+
   /* we are a sender now */
   src->is_sender = TRUE;
 
+  /* update stats for the SR */
+  src->stats.packets_sent++;
+  src->stats.octets_sent += len;
+
+
   /* push packet */
-  if (src->callbacks.push_rtp)
+  if (src->callbacks.push_rtp) {
+    GST_DEBUG ("pushing RTP packet %u", src->stats.packets_sent);
     result = src->callbacks.push_rtp (src, buffer, src->user_data);
-  else
+  } else {
+    GST_DEBUG ("no callback installed");
     gst_buffer_unref (buffer);
+  }
 
   return result;
 }