From 600afaaff9e489c9e3ea3d5546fd58568564be40 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 30 Apr 2007 13:41:30 +0000 Subject: [PATCH] gst/rtpmanager/async_jitter_queue.c: Fix the case where the buffer underruns and does not block. Original commit message from CVS: * gst/rtpmanager/async_jitter_queue.c: (signal_waiting_threads), (async_jitter_queue_pop_intern_unlocked): Fix the case where the buffer underruns and does not block. * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_base_init), (create_recv_rtcp), (create_send_rtp), (create_rtcp), (gst_rtp_bin_request_new_pad): Rename RTCP send pad, like in the session manager. Allow getting an RTCP pad for receiving even if we don't receive RTP. fix handling of send_rtp_src pad. * gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_chain): When no pt map could be found, fall back to the sinkpad caps. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp), (create_recv_rtp_sink), (create_recv_rtcp_sink), (create_send_rtp_sink), (create_send_rtcp_src): Fix pad names. * gst/rtpmanager/rtpsession.c: (source_push_rtp), (rtp_session_create_source), (rtp_session_process_sr), (rtp_session_send_rtp), (session_start_rtcp): * gst/rtpmanager/rtpsession.h: Unlock session when performing a callback. Add callbacks for the internal session object. Fix sending of RTP packets. first attempt at adding NTP times in the SR packets. Small debug and doc improvements. * gst/rtpmanager/rtpsource.c: (rtp_source_send_rtp): Update stats for SR reports. --- gst/rtpmanager/async_jitter_queue.c | 29 +++++++++++++-------- gst/rtpmanager/gstrtpbin.c | 51 ++++++++++++++++++++++--------------- gst/rtpmanager/gstrtpptdemux.c | 2 ++ gst/rtpmanager/gstrtpsession.c | 16 +++++++----- gst/rtpmanager/rtpsession.c | 43 ++++++++++++++++++++++++++++--- gst/rtpmanager/rtpsession.h | 3 ++- gst/rtpmanager/rtpsource.c | 15 +++++++++-- 7 files changed, 116 insertions(+), 43 deletions(-) diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c index 22a8ed0..81ba381 100644 --- a/gst/rtpmanager/async_jitter_queue.c +++ b/gst/rtpmanager/async_jitter_queue.c @@ -100,6 +100,7 @@ signal_waiting_threads (AsyncJitterQueue * queue) { if (async_jitter_queue_length_ts_units_unlocked (queue) >= queue->high_threshold * queue->max_queue_length) { + GST_DEBUG ("stop buffering"); queue->buffering = FALSE; } @@ -473,6 +474,7 @@ async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue) { gpointer retval; GstBuffer *tail_buffer = NULL; + guint tsunits; if (queue->pop_flushing) return NULL; @@ -485,20 +487,27 @@ async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue) return NULL; } - if (async_jitter_queue_length_ts_units_unlocked (queue) <= - queue->low_threshold * queue->max_queue_length + + tsunits = async_jitter_queue_length_ts_units_unlocked (queue); + + GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining, + queue->low_threshold * queue->max_queue_length); + + if (tsunits <= queue->low_threshold * queue->max_queue_length && queue->pops_remaining == 0) { if (!queue->buffering) { + GST_DEBUG ("start buffering"); queue->buffering = TRUE; queue->pops_remaining = queue->queue->length; - } else { - while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) { - queue->waiting_threads++; - g_cond_wait (queue->cond, queue->mutex); - queue->waiting_threads--; - if (queue->pop_flushing) - return NULL; - } + } + + GST_DEBUG ("wait for data"); + while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) { + queue->waiting_threads++; + g_cond_wait (queue->cond, queue->mutex); + queue->waiting_threads--; + if (queue->pop_flushing) + return NULL; } } diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index eb82739..927755f 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -84,8 +84,8 @@ GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d", GST_STATIC_CAPS ("application/x-rtp") ); -static GstStaticPadTemplate rtpbin_rtcp_src_template = -GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d", +static GstStaticPadTemplate rtpbin_send_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d", GST_PAD_SRC, GST_PAD_REQUEST, GST_STATIC_CAPS ("application/x-rtcp") @@ -195,7 +195,7 @@ struct _GstRTPBinSession GstPad *recv_rtcp_src; GstPad *send_rtp_sink; GstPad *send_rtp_src; - GstPad *rtcp_src; + GstPad *send_rtcp_src; }; /* find a session with the given id. Must be called with RTP_BIN_LOCK */ @@ -432,7 +432,7 @@ gst_rtp_bin_base_init (gpointer klass) gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&rtpbin_recv_rtp_src_template)); gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&rtpbin_rtcp_src_template)); + gst_static_pad_template_get (&rtpbin_send_rtcp_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&rtpbin_send_rtp_src_template)); @@ -795,10 +795,15 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); - /* get the session, it must exist or we error */ + /* get or create the session */ session = find_session_by_id (rtpbin, sessid); - if (!session) - goto no_session; + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } /* check if pad was requested */ if (session->recv_rtcp_sink != NULL) @@ -841,9 +846,9 @@ no_name: g_warning ("rtpbin: invalid name given"); return NULL; } -no_session: +create_error: { - g_warning ("rtpbin: no session with id %d", sessid); + /* create_session already warned */ return NULL; } existed: @@ -872,7 +877,7 @@ link_failed: static GstPad * create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) { - GstPad *result, *srcpad, *srcghost; + GstPad *result, *srcghost; gchar *gname; guint sessid; GstRTPBinSession *session; @@ -895,7 +900,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink != NULL) goto existed; - /* get recv_rtp pad and store */ + /* get send_rtp pad and store */ session->send_rtp_sink = gst_element_get_request_pad (session->session, "send_rtp_sink"); if (session->send_rtp_sink == NULL) @@ -907,8 +912,9 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); /* get srcpad */ - srcpad = gst_element_get_pad (session->session, "send_rtp_src"); - if (srcpad == NULL) + session->send_rtp_src = + gst_element_get_static_pad (session->session, "send_rtp_src"); + if (session->send_rtp_src == NULL) goto no_srcpad; /* ghost the new source pad */ @@ -916,7 +922,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) gname = g_strdup_printf ("send_rtp_src_%d", sessid); templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d"); srcghost = - gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ); + gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ); gst_pad_set_active (srcghost, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost); g_free (gname); @@ -962,7 +968,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) GstRTPBinSession *session; /* first get the session number */ - if (name == NULL || sscanf (name, "rtcp_src_%d", &sessid) != 1) + if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1) goto no_name; /* get or create session */ @@ -971,16 +977,17 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) goto no_session; /* check if pad was requested */ - if (session->rtcp_src != NULL) + if (session->send_rtcp_src != NULL) goto existed; /* get rtcp_src pad and store */ - session->rtcp_src = + session->send_rtcp_src = gst_element_get_request_pad (session->session, "send_rtcp_src"); - if (session->rtcp_src == NULL) + if (session->send_rtcp_src == NULL) goto pad_failed; - result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ); + result = + gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ); gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); @@ -999,7 +1006,8 @@ no_session: } existed: { - g_warning ("rtpbin: rtcp_src pad already requested for session %d", sessid); + g_warning ("rtpbin: send_rtcp_src pad already requested for session %d", + sessid); return NULL; } pad_failed: @@ -1036,7 +1044,8 @@ gst_rtp_bin_request_new_pad (GstElement * element, } else if (templ == gst_element_class_get_pad_template (klass, "send_rtp_sink_%d")) { result = create_send_rtp (rtpbin, templ, name); - } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%d")) { + } else if (templ == gst_element_class_get_pad_template (klass, + "send_rtcp_src_%d")) { result = create_rtcp (rtpbin, templ, name); } else goto wrong_template; diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c index 247df14..40127f2 100644 --- a/gst/rtpmanager/gstrtpptdemux.c +++ b/gst/rtpmanager/gstrtpptdemux.c @@ -258,6 +258,8 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) &ret); caps = g_value_get_boxed (&ret); + if (caps == NULL) + caps = GST_PAD_CAPS (rtpdemux->sink); if (!caps) goto no_caps; diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 12a46fe..80f340a 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -451,6 +451,8 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; + GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet"); + if (rtpsession->recv_rtp_src) { result = gst_pad_push (rtpsession->recv_rtp_src, buffer); } else { @@ -473,6 +475,8 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; + GST_DEBUG_OBJECT (rtpsession, "sending RTP packet"); + if (rtpsession->send_rtp_src) { result = gst_pad_push (rtpsession->send_rtp_src, buffer); } else { @@ -737,7 +741,7 @@ create_recv_rtp_sink (GstRTPSession * rtpsession) rtpsession->recv_rtp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template, - NULL); + "recv_rtp_sink"); gst_pad_set_chain_function (rtpsession->recv_rtp_sink, gst_rtp_session_chain_recv_rtp); gst_pad_set_event_function (rtpsession->recv_rtp_sink, @@ -766,7 +770,7 @@ create_recv_rtcp_sink (GstRTPSession * rtpsession) rtpsession->recv_rtcp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template, - NULL); + "recv_rtcp_sink"); gst_pad_set_chain_function (rtpsession->recv_rtcp_sink, gst_rtp_session_chain_recv_rtcp); gst_pad_set_event_function (rtpsession->recv_rtcp_sink, @@ -795,18 +799,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession) rtpsession->send_rtp_sink = gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template, - NULL); + "send_rtp_sink"); gst_pad_set_chain_function (rtpsession->send_rtp_sink, gst_rtp_session_chain_send_rtp); gst_pad_set_event_function (rtpsession->send_rtp_sink, gst_rtp_session_event_send_rtp_sink); gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), - rtpsession->recv_rtcp_sink); + rtpsession->send_rtp_sink); rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, - NULL); + "send_rtp_src"); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -824,7 +828,7 @@ create_send_rtcp_src (GstRTPSession * rtpsession) rtpsession->send_rtcp_src = gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, - NULL); + "send_rtcp_src"); gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtcp_src); diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index af418ab..8a7f3e7 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -622,6 +622,7 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (source == session->source) { GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc); + RTP_SESSION_UNLOCK (session); if (session->callbacks.send_rtp) result = @@ -629,8 +630,11 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) session->user_data); else gst_buffer_unref (buffer); + } else { GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc); + RTP_SESSION_UNLOCK (session); + if (session->callbacks.process_rtp) result = session->callbacks.process_rtp (session, source, buffer, @@ -638,6 +642,8 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) else gst_buffer_unref (buffer); } + RTP_SESSION_LOCK (session); + return result; } @@ -877,6 +883,7 @@ rtp_session_create_source (RTPSession * sess) } source = rtp_source_new (ssrc); g_object_ref (source); + rtp_source_set_callbacks (source, &callbacks, sess); g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), source); /* we have one more source now */ @@ -1080,6 +1087,8 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter); + if (ssrc == sess->source->ssrc) { /* only deal with report blocks for our session, we update the stats of * the sender of the RTCP message. We could also compare our stats against @@ -1361,7 +1370,8 @@ ignore: * @sess: an #RTPSession * @buffer: an RTP buffer * - * Send the RTP buffer in the session manager. + * Send the RTP buffer in the session manager. This function takes ownership of + * @buffer. * * Returns: a #GstFlowReturn. */ @@ -1375,9 +1385,19 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + if (!gst_rtp_buffer_validate (buffer)) + goto invalid_packet; + + GST_DEBUG ("received RTP packet for sending"); + RTP_SESSION_LOCK (sess); source = sess->source; + /* update last activity */ + if (sess->callbacks.get_time) + source->last_rtp_activity = + sess->callbacks.get_time (sess, sess->user_data); + prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ @@ -1388,6 +1408,14 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_UNLOCK (sess); return result; + + /* ERRORS */ +invalid_packet: + { + gst_buffer_unref (buffer); + GST_DEBUG ("invalid RTP packet received"); + return GST_FLOW_OK; + } } static GstClockTime @@ -1534,13 +1562,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data) data->rtcp = gst_rtcp_buffer_new (sess->mtu); if (RTP_SOURCE_IS_SENDER (own)) { + guint64 ntptime; + guint32 rtptime; + /* we are a sender, create SR */ GST_DEBUG ("create SR for SSRC %08x", own->ssrc); gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); - /* fill in sender report info, FIXME NTP and RTP timestamps missing */ + /* convert clock time to NTP time */ + ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND); + ntptime += (2208988800LL << 32); + + rtptime = 0; + + /* fill in sender report info, FIXME RTP timestamps missing */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - 0, 0, own->stats.packets_sent, own->stats.octets_sent); + ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index c9a2114..9082d9d 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -110,7 +110,7 @@ typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data) * @sess: an #RTPSession * @user_data: user data specified when registering * - * This callback will be called when @sess needs to cancel the previous timeout. + * This callback will be called when @sess needs to cancel the current timeout. * The currently running timeout should be canceled and a new reporting interval * should be requested from @sess. */ @@ -122,6 +122,7 @@ typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data); * @RTPSessionSendRTP: callback for sending RTP packets * @RTPSessionSendRTCP: callback for sending RTCP packets * @RTPSessionGetTime: callback for returning the current time + * @RTPSessionReconsider: callback for reconsidering the timeout * * These callbacks can be installed on the session manager to get notification * when RTP and RTCP packets are ready for further processing. These callbacks diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index e1e6aac..7af7467 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -453,18 +453,29 @@ GstFlowReturn rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) { GstFlowReturn result = GST_FLOW_OK; + guint len; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + len = gst_rtp_buffer_get_payload_len (buffer); + /* we are a sender now */ src->is_sender = TRUE; + /* update stats for the SR */ + src->stats.packets_sent++; + src->stats.octets_sent += len; + + /* push packet */ - if (src->callbacks.push_rtp) + if (src->callbacks.push_rtp) { + GST_DEBUG ("pushing RTP packet %u", src->stats.packets_sent); result = src->callbacks.push_rtp (src, buffer, src->user_data); - else + } else { + GST_DEBUG ("no callback installed"); gst_buffer_unref (buffer); + } return result; } -- 2.7.4