struct _GstRtpSessionPrivate
{
GMutex *lock;
+ GstClock *sysclock;
RTPSession *session;
/* thread for sending out RTCP */
{
rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
rtpsession->priv->lock = g_mutex_new ();
+ rtpsession->priv->sysclock = gst_system_clock_obtain ();
rtpsession->priv->session = rtp_session_new ();
/* configure callbacks */
rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
g_hash_table_destroy (rtpsession->priv->ptmap);
g_mutex_free (rtpsession->priv->lock);
+ g_object_unref (rtpsession->priv->sysclock);
g_object_unref (rtpsession->priv->session);
G_OBJECT_CLASS (parent_class)->finalize (object);
static void
rtcp_thread (GstRtpSession * rtpsession)
{
- GstClock *sysclock;
GstClockID id;
GstClockTime current_time;
GstClockTime next_timeout;
guint64 ntpnstime;
- /* for RTCP timeouts we use the system clock */
- sysclock = gst_system_clock_obtain ();
- if (sysclock == NULL)
- goto no_sysclock;
-
- current_time = gst_clock_get_time (sysclock);
-
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
GST_RTP_SESSION_LOCK (rtpsession);
+ current_time = gst_clock_get_time (rtpsession->priv->sysclock);
+
while (!rtpsession->priv->stop_thread) {
GstClockReturn res;
break;
id = rtpsession->priv->id =
- gst_clock_new_single_shot_id (sysclock, next_timeout);
+ gst_clock_new_single_shot_id (rtpsession->priv->sysclock, next_timeout);
GST_RTP_SESSION_UNLOCK (rtpsession);
res = gst_clock_id_wait (id, NULL);
break;
/* update current time */
- current_time = gst_clock_get_time (sysclock);
+ current_time = gst_clock_get_time (rtpsession->priv->sysclock);
/* get current NTP time */
ntpnstime = get_current_ntp_ns_time (rtpsession);
rtpsession->priv->thread_stopped = TRUE;
GST_RTP_SESSION_UNLOCK (rtpsession);
- gst_object_unref (sysclock);
-
GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
- return;
-
- /* ERRORS */
-no_sysclock:
- {
- GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL),
- ("Could not get system clock"));
- return;
- }
}
static gboolean
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
+ GstClockTime current_time;
guint64 ntpnstime;
GstClockTime timestamp;
ntpnstime = get_current_ntp_ns_time (rtpsession);
}
- ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
+ current_time = gst_clock_get_time (priv->sysclock);
+ ret = rtp_session_process_rtp (priv->session, buffer, current_time,
+ ntpnstime);
if (ret != GST_FLOW_OK)
goto push_error;
{
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
+ GstClockTime current_time;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
- ret = rtp_session_process_rtcp (priv->session, buffer);
+ current_time = gst_clock_get_time (priv->sysclock);
+ ret = rtp_session_process_rtcp (priv->session, buffer, current_time);
gst_object_unref (rtpsession);
gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
break;
- case GST_EVENT_NEWSEGMENT:
- {
+ case GST_EVENT_NEWSEGMENT:{
gboolean update;
gdouble rate, arate;
GstFormat format;
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
break;
}
- case GST_EVENT_EOS:
+ case GST_EVENT_EOS:{
+ GstClockTime current_time;
+
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
- rtp_session_send_bye (rtpsession->priv->session, "End of stream");
+ current_time = gst_clock_get_time (rtpsession->priv->sysclock);
+ rtp_session_send_bye (rtpsession->priv->session, "End of stream",
+ current_time);
break;
+ }
default:
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
break;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
GstClockTime timestamp;
+ GstClockTime current_time;
guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
ntpnstime = -1;
}
- ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
+ current_time = gst_clock_get_time (priv->sysclock);
+ ret = rtp_session_send_rtp (priv->session, buffer, current_time, ntpnstime);
if (ret != GST_FLOW_OK)
goto push_error;
static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess,
- const gchar * reason);
+ const gchar * reason, GstClockTime current_time);
static GstClockTime calculate_rtcp_interval (RTPSession * sess,
gboolean deterministic, gboolean first);
GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
on_ssrc_collision (sess, source);
- rtp_session_send_bye_locked (sess, "SSRC Collision");
+ rtp_session_send_bye_locked (sess, "SSRC Collision", arrival->time);
sess->change_ssrc = TRUE;
}
*/
static void
update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
- gboolean rtp, GstBuffer * buffer, guint64 ntpnstime)
+ gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
+ guint64 ntpnstime)
{
- GTimeVal current;
-
/* get time of arrival */
- g_get_current_time (¤t);
- arrival->time = GST_TIMEVAL_TO_TIME (current);
+ arrival->time = current_time;
arrival->timestamp = GST_BUFFER_TIMESTAMP (buffer);
arrival->ntpnstime = ntpnstime;
* rtp_session_process_rtp:
* @sess: and #RTPSession
* @buffer: an RTP buffer
+ * @current_time: the current system time
* @ntpnstime: the NTP arrival time in nanoseconds
*
* Process an RTP buffer in the session manager. This function takes ownership
*/
GstFlowReturn
rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
- guint64 ntpnstime)
+ GstClockTime current_time, guint64 ntpnstime)
{
GstFlowReturn result;
guint32 ssrc;
RTP_SESSION_LOCK (sess);
/* update arrival stats */
- update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime);
+ update_arrival_stats (sess, &arrival, TRUE, buffer, current_time, ntpnstime);
/* ignore more RTP packets when we left the session */
if (sess->source->received_bye)
* rtp_session_process_rtcp:
* @sess: and #RTPSession
* @buffer: an RTCP buffer
+ * @current_time: the current system time
*
* Process an RTCP buffer in the session manager. This function takes ownership
* of @buffer.
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
+rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
+ GstClockTime current_time)
{
GstRTCPPacket packet;
gboolean more, is_bye = FALSE, is_sr = FALSE;
RTP_SESSION_LOCK (sess);
/* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer, -1);
+ update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
if (sess->sent_bye)
goto ignore;
* rtp_session_send_rtp:
* @sess: an #RTPSession
* @buffer: an RTP buffer
+ * @current_time: the current system time
* @ntpnstime: the NTP time in nanoseconds of when this buffer was captured.
*
* Send the RTP buffer in the session manager. This function takes ownership of
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntpnstime)
+rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer,
+ GstClockTime current_time, guint64 ntpnstime)
{
GstFlowReturn result;
RTPSource *source;
gboolean prevsender;
- GTimeVal current;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
source = sess->source;
/* update last activity */
- g_get_current_time (¤t);
- source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current);
+ source->last_rtp_activity = current_time;
prevsender = RTP_SOURCE_IS_SENDER (source);
* Returns: a #GstFlowReturn.
*/
static GstFlowReturn
-rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason)
+rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason,
+ GstClockTime current_time)
{
GstFlowReturn result = GST_FLOW_OK;
RTPSource *source;
- GstClockTime current, interval;
- GTimeVal curtv;
+ GstClockTime interval;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
sess->first_rtcp = TRUE;
sess->sent_bye = FALSE;
- /* get current time */
- g_get_current_time (&curtv);
- current = GST_TIMEVAL_TO_TIME (curtv);
-
/* reschedule transmission */
- sess->last_rtcp_send_time = current;
+ sess->last_rtcp_send_time = current_time;
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
- sess->next_rtcp_check_time = current + interval;
+ sess->next_rtcp_check_time = current_time + interval;
GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
* rtp_session_send_bye:
* @sess: an #RTPSession
* @reason: a reason or NULL
+ * @current_time: the current system time
*
* Stop the current @sess and schedule a BYE message for the other members.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+rtp_session_send_bye (RTPSession * sess, const gchar * reason,
+ GstClockTime current_time)
{
GstFlowReturn result = GST_FLOW_OK;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
RTP_SESSION_LOCK (sess);
- result = rtp_session_send_bye_locked (sess, reason);
+ result = rtp_session_send_bye_locked (sess, reason, current_time);
RTP_SESSION_UNLOCK (sess);
return result;
/**
* rtp_session_next_timeout:
* @sess: an #RTPSession
- * @time: the current system time
+ * @current_time: the current system time
*
* Get the next time we should perform session maintenance tasks.
*
* current system time.
*/
GstClockTime
-rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
+rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
{
GstClockTime result;
result = sess->next_rtcp_check_time;
GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
- GST_TIME_ARGS (time), GST_TIME_ARGS (result));
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
- if (result < time) {
+ if (result < current_time) {
GST_DEBUG ("take current time as base");
/* our previous check time expired, start counting from the current time
* again. */
- result = time;
+ result = current_time;
}
if (sess->source->received_bye) {
GST_DEBUG ("first RTCP packet");
/* we are called for the first time */
result += calculate_rtcp_interval (sess, FALSE, TRUE);
- } else if (sess->next_rtcp_check_time < time) {
+ } else if (sess->next_rtcp_check_time < current_time) {
GST_DEBUG ("old check time expired, getting new timeout");
/* get a new timeout when we need to */
result += calculate_rtcp_interval (sess, FALSE, FALSE);
{
RTPSession *sess;
GstBuffer *rtcp;
- GstClockTime time;
+ GstClockTime current_time;
guint64 ntpnstime;
GstClockTime interval;
GstRTCPPacket packet;
guint32 lsr, dlsr;
/* get new stats */
- rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost,
- &exthighestseq, &jitter, &lsr, &dlsr);
+ rtp_source_get_new_rb (source, data->current_time, &fractionlost,
+ &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
/* packet is not yet filled, add report block for this source. */
gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
if (source->received_bye) {
/* if we received a BYE from the source, remove the source after some
* time. */
- if (data->time > source->bye_time &&
- data->time - source->bye_time > sess->stats.bye_timeout) {
+ if (data->current_time > source->bye_time &&
+ data->current_time - source->bye_time > sess->stats.bye_timeout) {
GST_DEBUG ("removing BYE source %08x", source->ssrc);
remove = TRUE;
byetimeout = TRUE;
}
/* sources that were inactive for more than 5 times the deterministic reporting
* interval get timed out. the min timeout is 5 seconds. */
- if (data->time > source->last_activity) {
+ if (data->current_time > source->last_activity) {
interval = MAX (data->interval * 5, 5 * GST_SECOND);
- if (data->time - source->last_activity > interval) {
+ if (data->current_time - source->last_activity > interval) {
GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
source->ssrc, GST_TIME_ARGS (source->last_activity));
remove = TRUE;
/* senders that did not send for a long time become a receiver, this also
* holds for our own source. */
if (is_sender) {
- if (data->time > source->last_rtp_activity) {
+ if (data->current_time > source->last_rtp_activity) {
interval = MAX (data->interval * 2, 5 * GST_SECOND);
- if (data->time - source->last_rtp_activity > interval) {
+ if (data->current_time - source->last_rtp_activity > interval) {
GST_DEBUG ("sender source %08x timed out and became receiver, last %"
GST_TIME_FORMAT, source->ssrc,
GST_TIME_ARGS (source->last_rtp_activity));
}
static gboolean
-is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
+is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
{
GstClockTime new_send_time, elapsed;
gboolean result;
/* no need to check yet */
- if (sess->next_rtcp_check_time > time) {
+ if (sess->next_rtcp_check_time > current_time) {
GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
- GST_TIME_ARGS (time));
+ GST_TIME_ARGS (current_time));
return FALSE;
}
/* get elapsed time since we last reported */
- elapsed = time - sess->last_rtcp_send_time;
+ elapsed = current_time - sess->last_rtcp_send_time;
/* perform forward reconsideration */
new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
new_send_time += sess->last_rtcp_send_time;
/* check if reconsideration */
- if (time < new_send_time) {
+ if (current_time < new_send_time) {
GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_send_time));
result = FALSE;
GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_send_time));
- sess->next_rtcp_check_time = time + new_send_time;
+ sess->next_rtcp_check_time = current_time + new_send_time;
}
return result;
}
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
- * @time: the current system time
+ * @current_time: the current system time
* @ntpnstime: the current NTP time in nanoseconds
*
* Perform maintenance actions after the timeout obtained with
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
+rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
+ guint64 ntpnstime)
{
GstFlowReturn result = GST_FLOW_OK;
GList *item;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime));
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (ntpnstime));
data.sess = sess;
data.rtcp = NULL;
- data.time = time;
+ data.current_time = current_time;
data.ntpnstime = ntpnstime;
data.is_bye = FALSE;
data.has_sdes = FALSE;
(GHRFunc) session_cleanup, &data);
/* see if we need to generate SR or RR packets */
- if (is_rtcp_time (sess, time, &data)) {
+ if (is_rtcp_time (sess, current_time, &data)) {
if (sess->source->received_bye) {
/* generate BYE instead */
session_bye (sess, &data);
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
- sess->last_rtcp_send_time = data.time;
+ sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
/* add SDES for this source when not already added */
RTPConflictingAddress *known_conflict = item->data;
GList *next_item = g_list_next (item);
- if (known_conflict->time < time - (data.interval *
+ if (known_conflict->time < current_time - (data.interval *
RTCP_INTERVAL_COLLISION_TIMEOUT)) {
sess->conflicting_addresses =
g_list_delete_link (sess->conflicting_addresses, item);