{
if (async_jitter_queue_length_ts_units_unlocked (queue) >=
queue->high_threshold * queue->max_queue_length) {
+ GST_DEBUG ("stop buffering");
queue->buffering = FALSE;
}
{
gpointer retval;
GstBuffer *tail_buffer = NULL;
+ guint tsunits;
if (queue->pop_flushing)
return NULL;
return NULL;
}
- if (async_jitter_queue_length_ts_units_unlocked (queue) <=
- queue->low_threshold * queue->max_queue_length
+
+ tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
+
+ GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
+ queue->low_threshold * queue->max_queue_length);
+
+ if (tsunits <= queue->low_threshold * queue->max_queue_length
&& queue->pops_remaining == 0) {
if (!queue->buffering) {
+ GST_DEBUG ("start buffering");
queue->buffering = TRUE;
queue->pops_remaining = queue->queue->length;
- } else {
- while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
- queue->waiting_threads++;
- g_cond_wait (queue->cond, queue->mutex);
- queue->waiting_threads--;
- if (queue->pop_flushing)
- return NULL;
- }
+ }
+
+ GST_DEBUG ("wait for data");
+ while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
+ queue->waiting_threads++;
+ g_cond_wait (queue->cond, queue->mutex);
+ queue->waiting_threads--;
+ if (queue->pop_flushing)
+ return NULL;
}
}
GST_STATIC_CAPS ("application/x-rtp")
);
-static GstStaticPadTemplate rtpbin_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS ("application/x-rtcp")
GstPad *recv_rtcp_src;
GstPad *send_rtp_sink;
GstPad *send_rtp_src;
- GstPad *rtcp_src;
+ GstPad *send_rtcp_src;
};
/* find a session with the given id. Must be called with RTP_BIN_LOCK */
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&rtpbin_rtcp_src_template));
+ gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
- /* get the session, it must exist or we error */
+ /* get or create the session */
session = find_session_by_id (rtpbin, sessid);
- if (!session)
- goto no_session;
+ if (!session) {
+ GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
+ /* create session now */
+ session = create_session (rtpbin, sessid);
+ if (session == NULL)
+ goto create_error;
+ }
/* check if pad was requested */
if (session->recv_rtcp_sink != NULL)
g_warning ("rtpbin: invalid name given");
return NULL;
}
-no_session:
+create_error:
{
- g_warning ("rtpbin: no session with id %d", sessid);
+ /* create_session already warned */
return NULL;
}
existed:
static GstPad *
create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
{
- GstPad *result, *srcpad, *srcghost;
+ GstPad *result, *srcghost;
gchar *gname;
guint sessid;
GstRTPBinSession *session;
if (session->send_rtp_sink != NULL)
goto existed;
- /* get recv_rtp pad and store */
+ /* get send_rtp pad and store */
session->send_rtp_sink =
gst_element_get_request_pad (session->session, "send_rtp_sink");
if (session->send_rtp_sink == NULL)
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
/* get srcpad */
- srcpad = gst_element_get_pad (session->session, "send_rtp_src");
- if (srcpad == NULL)
+ session->send_rtp_src =
+ gst_element_get_static_pad (session->session, "send_rtp_src");
+ if (session->send_rtp_src == NULL)
goto no_srcpad;
/* ghost the new source pad */
gname = g_strdup_printf ("send_rtp_src_%d", sessid);
templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
srcghost =
- gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ);
+ gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
gst_pad_set_active (srcghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
g_free (gname);
GstRTPBinSession *session;
/* first get the session number */
- if (name == NULL || sscanf (name, "rtcp_src_%d", &sessid) != 1)
+ if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1)
goto no_name;
/* get or create session */
goto no_session;
/* check if pad was requested */
- if (session->rtcp_src != NULL)
+ if (session->send_rtcp_src != NULL)
goto existed;
/* get rtcp_src pad and store */
- session->rtcp_src =
+ session->send_rtcp_src =
gst_element_get_request_pad (session->session, "send_rtcp_src");
- if (session->rtcp_src == NULL)
+ if (session->send_rtcp_src == NULL)
goto pad_failed;
- result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ);
+ result =
+ gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
}
existed:
{
- g_warning ("rtpbin: rtcp_src pad already requested for session %d", sessid);
+ g_warning ("rtpbin: send_rtcp_src pad already requested for session %d",
+ sessid);
return NULL;
}
pad_failed:
} else if (templ == gst_element_class_get_pad_template (klass,
"send_rtp_sink_%d")) {
result = create_send_rtp (rtpbin, templ, name);
- } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%d")) {
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "send_rtcp_src_%d")) {
result = create_rtcp (rtpbin, templ, name);
} else
goto wrong_template;
&ret);
caps = g_value_get_boxed (&ret);
+ if (caps == NULL)
+ caps = GST_PAD_CAPS (rtpdemux->sink);
if (!caps)
goto no_caps;
rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
+ GST_DEBUG_OBJECT (rtpsession, "reading receiving RTP packet");
+
if (rtpsession->recv_rtp_src) {
result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
} else {
rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
+ GST_DEBUG_OBJECT (rtpsession, "sending RTP packet");
+
if (rtpsession->send_rtp_src) {
result = gst_pad_push (rtpsession->send_rtp_src, buffer);
} else {
rtpsession->recv_rtp_sink =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
- NULL);
+ "recv_rtp_sink");
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp);
gst_pad_set_event_function (rtpsession->recv_rtp_sink,
rtpsession->recv_rtcp_sink =
gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
- NULL);
+ "recv_rtcp_sink");
gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
gst_rtp_session_chain_recv_rtcp);
gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
rtpsession->send_rtp_sink =
gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
- NULL);
+ "send_rtp_sink");
gst_pad_set_chain_function (rtpsession->send_rtp_sink,
gst_rtp_session_chain_send_rtp);
gst_pad_set_event_function (rtpsession->send_rtp_sink,
gst_rtp_session_event_send_rtp_sink);
gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
- rtpsession->recv_rtcp_sink);
+ rtpsession->send_rtp_sink);
rtpsession->send_rtp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
- NULL);
+ "send_rtp_src");
gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
rtpsession->send_rtcp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
- NULL);
+ "send_rtcp_src");
gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtcp_src);
if (source == session->source) {
GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc);
+ RTP_SESSION_UNLOCK (session);
if (session->callbacks.send_rtp)
result =
session->user_data);
else
gst_buffer_unref (buffer);
+
} else {
GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc);
+ RTP_SESSION_UNLOCK (session);
+
if (session->callbacks.process_rtp)
result =
session->callbacks.process_rtp (session, source, buffer,
else
gst_buffer_unref (buffer);
}
+ RTP_SESSION_LOCK (session);
+
return result;
}
}
source = rtp_source_new (ssrc);
g_object_ref (source);
+ rtp_source_set_callbacks (source, &callbacks, sess);
g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
source);
/* we have one more source now */
gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
&packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+ GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter);
+
if (ssrc == sess->source->ssrc) {
/* only deal with report blocks for our session, we update the stats of
* the sender of the RTCP message. We could also compare our stats against
* @sess: an #RTPSession
* @buffer: an RTP buffer
*
- * Send the RTP buffer in the session manager.
+ * Send the RTP buffer in the session manager. This function takes ownership of
+ * @buffer.
*
* Returns: a #GstFlowReturn.
*/
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ if (!gst_rtp_buffer_validate (buffer))
+ goto invalid_packet;
+
+ GST_DEBUG ("received RTP packet for sending");
+
RTP_SESSION_LOCK (sess);
source = sess->source;
+ /* update last activity */
+ if (sess->callbacks.get_time)
+ source->last_rtp_activity =
+ sess->callbacks.get_time (sess, sess->user_data);
+
prevsender = RTP_SOURCE_IS_SENDER (source);
/* we use our own source to send */
RTP_SESSION_UNLOCK (sess);
return result;
+
+ /* ERRORS */
+invalid_packet:
+ {
+ gst_buffer_unref (buffer);
+ GST_DEBUG ("invalid RTP packet received");
+ return GST_FLOW_OK;
+ }
}
static GstClockTime
data->rtcp = gst_rtcp_buffer_new (sess->mtu);
if (RTP_SOURCE_IS_SENDER (own)) {
+ guint64 ntptime;
+ guint32 rtptime;
+
/* we are a sender, create SR */
GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
- /* fill in sender report info, FIXME NTP and RTP timestamps missing */
+ /* convert clock time to NTP time */
+ ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND);
+ ntptime += (2208988800LL << 32);
+
+ rtptime = 0;
+
+ /* fill in sender report info, FIXME RTP timestamps missing */
gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
- 0, 0, own->stats.packets_sent, own->stats.octets_sent);
+ ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent);
} else {
/* we are only receiver, create RR */
GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
* @sess: an #RTPSession
* @user_data: user data specified when registering
*
- * This callback will be called when @sess needs to cancel the previous timeout.
+ * This callback will be called when @sess needs to cancel the current timeout.
* The currently running timeout should be canceled and a new reporting interval
* should be requested from @sess.
*/
* @RTPSessionSendRTP: callback for sending RTP packets
* @RTPSessionSendRTCP: callback for sending RTCP packets
* @RTPSessionGetTime: callback for returning the current time
+ * @RTPSessionReconsider: callback for reconsidering the timeout
*
* These callbacks can be installed on the session manager to get notification
* when RTP and RTCP packets are ready for further processing. These callbacks
rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
{
GstFlowReturn result = GST_FLOW_OK;
+ guint len;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ len = gst_rtp_buffer_get_payload_len (buffer);
+
/* we are a sender now */
src->is_sender = TRUE;
+ /* update stats for the SR */
+ src->stats.packets_sent++;
+ src->stats.octets_sent += len;
+
+
/* push packet */
- if (src->callbacks.push_rtp)
+ if (src->callbacks.push_rtp) {
+ GST_DEBUG ("pushing RTP packet %u", src->stats.packets_sent);
result = src->callbacks.push_rtp (src, buffer, src->user_data);
- else
+ } else {
+ GST_DEBUG ("no callback installed");
gst_buffer_unref (buffer);
+ }
return result;
}