rtpbin: add on_npt_stop signal
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 27 Mar 2009 16:44:57 +0000 (17:44 +0100)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:41 +0000 (02:30 +0100)
Add the on_npt_stop signal to rtpbin and rtpjitterbuffer to notify the
application that the NPT stop position has been reached.

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpjitterbuffer.h

index a845bbb..4322ee0 100644 (file)
@@ -233,6 +233,7 @@ enum
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
   SIGNAL_ON_SENDER_TIMEOUT,
+  SIGNAL_ON_NPT_STOP,
   LAST_SIGNAL
 };
 
@@ -459,6 +460,13 @@ on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
       sess->id, ssrc);
 }
 
+static void
+on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
+{
+  g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
+      stream->session->id, stream->ssrc);
+}
+
 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
 static GstRtpBinSession *
 create_session (GstRtpBin * rtpbin, gint id)
@@ -1091,6 +1099,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
   /* provide clock_rate to the jitterbuffer when needed */
   g_signal_connect (buffer, "request-pt-map",
       (GCallback) pt_map_requested, session);
+  g_signal_connect (buffer, "on-npt-stop", (GCallback) on_npt_stop, stream);
 
   /* configure latency and packet lost */
   g_object_set (buffer, "latency", session->bin->latency, NULL);
@@ -1375,6 +1384,20 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
       G_TYPE_UINT, G_TYPE_UINT);
 
+  /**
+   * GstRtpBin::on-npt-stop:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify that SSRC sender has sent data up to the configured NPT stop time.
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
+      g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
+
   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
       g_param_spec_string ("sdes-cname", "SDES CNAME",
           "The CNAME to put in SDES messages of this session",
index e7658f5..a984d4d 100644 (file)
@@ -82,6 +82,7 @@ struct _GstRtpBinClass {
   void     (*on_bye_timeout)    (GstRtpBin *rtpbin, guint session, guint32 ssrc);
   void     (*on_timeout)        (GstRtpBin *rtpbin, guint session, guint32 ssrc);
   void     (*on_sender_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_npt_stop)       (GstRtpBin *rtpbin, guint session, guint32 ssrc);
 };
 
 GType gst_rtp_bin_get_type (void);
index 2d3d445..0fa2395 100644 (file)
@@ -88,6 +88,7 @@ enum
   SIGNAL_REQUEST_PT_MAP,
   SIGNAL_CLEAR_PT_MAP,
   SIGNAL_HANDLE_SYNC,
+  SIGNAL_ON_NPT_STOP,
   LAST_SIGNAL
 };
 
@@ -151,6 +152,15 @@ struct _GstRtpJitterBufferPrivate
   /* the next expected seqnum we receive */
   guint32 next_in_seqnum;
 
+  /* start and stop ranges */
+  GstClockTime npt_start;
+  GstClockTime npt_stop;
+  guint64 ext_timestamp;
+  guint64 last_elapsed;
+  guint64 estimated_eos;
+  GstClockID eos_id;
+  gboolean reached_npt_stop;
+
   /* state */
   gboolean eos;
 
@@ -356,6 +366,19 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
 
   /**
+   * GstRtpJitterBuffer::on-npt-stop
+   * @buffer: the object which received the signal
+   *
+   * Signal that the jitterbufer has pushed the RTP packet that corresponds to
+   * the npt-stop position.
+   */
+  gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
+      g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
+          on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
+      G_TYPE_NONE, 0, G_TYPE_NONE);
+
+  /**
    * GstRtpJitterBuffer::clear-pt-map:
    * @buffer: the object which received the signal
    *
@@ -629,6 +652,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
   GstRtpJitterBufferPrivate *priv;
   GstStructure *caps_struct;
   guint val;
+  GstClockTime tval;
 
   priv = jitterbuffer->priv;
 
@@ -647,14 +671,15 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
 
   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
 
-  /* gah, clock-base is uint. If we don't have a base, we will use the first
-   * buffer timestamp as the base time. This will screw up sync but it's better
-   * than nothing. */
+  /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
+   * can use this to track the amount of time elapsed on the sender. */
   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
     priv->clock_base = val;
   else
     priv->clock_base = -1;
 
+  priv->ext_timestamp = priv->clock_base;
+
   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
       priv->clock_base);
 
@@ -668,6 +693,23 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
 
   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
 
+  /* the start and stop times. The seqnum-base corresponds to the start time. We
+   * will keep track of the seqnums on the output and when we reach the one
+   * corresponding to npt-stop, we emit the npt-stop-reached signal */
+  if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
+    priv->npt_start = tval;
+  else
+    priv->npt_start = 0;
+
+  if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
+    priv->npt_stop = tval;
+  else
+    priv->npt_stop = -1;
+
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
+      GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
+
   return TRUE;
 
   /* ERRORS */
@@ -800,6 +842,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       /* reset negotiated values */
       priv->clock_rate = -1;
       priv->clock_base = -1;
+      priv->last_elapsed = 0;
+      priv->estimated_eos = -1;
+      priv->reached_npt_stop = FALSE;
+      priv->ext_timestamp = -1;
       priv->peer_latency = 0;
       priv->last_pt = -1;
       /* block until we go to PLAYING */
@@ -1079,6 +1125,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
       timestamp);
 
   seqnum = gst_rtp_buffer_get_seq (buffer);
+
   GST_DEBUG_OBJECT (jitterbuffer,
       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
       GST_TIME_ARGS (timestamp));
@@ -1254,6 +1301,48 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
   return timestamp;
 }
 
+static GstClockTime
+get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
+{
+  GstClockTime result;
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+  /* add latency, this includes our own latency and the peer latency. */
+  result += (priv->latency_ms * GST_MSECOND);
+  result += priv->peer_latency;
+
+  return result;
+}
+
+static gboolean
+eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
+    GstRtpJitterBuffer * jitterbuffer)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  JBUF_LOCK_CHECK (priv, flushing);
+  if (priv->waiting) {
+    GST_DEBUG_OBJECT (jitterbuffer, "got the NPT timeout");
+    priv->reached_npt_stop = TRUE;
+    JBUF_SIGNAL (priv);
+  }
+  JBUF_UNLOCK (priv);
+
+  return TRUE;
+
+  /* ERRORS */
+flushing:
+  {
+    JBUF_UNLOCK (priv);
+    return FALSE;
+  }
+}
+
 /**
  * This funcion will push out buffers on the source pad.
  *
@@ -1272,6 +1361,9 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   GstClockTime timestamp, out_time;
   gboolean discont = FALSE;
   gint gap;
+  GstClock *clock;
+  GstClockID id;
+  GstClockTime sync_time;
 
   priv = jitterbuffer->priv;
 
@@ -1279,6 +1371,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 again:
   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
   while (TRUE) {
+    id = NULL;
     /* always wait if we are blocked */
     if (G_LIKELY (!priv->blocked)) {
       /* if we have a packet, we can exit the loop and grab it */
@@ -1287,11 +1380,38 @@ again:
       /* no packets but we are EOS, do eos logic */
       if (G_UNLIKELY (priv->eos))
         goto do_eos;
+      /* underrun, wait for packets or flushing now if we are expecting an EOS
+       * timeout, set the async timer for it too */
+      if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
+        sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
+
+        GST_OBJECT_LOCK (jitterbuffer);
+        clock = GST_ELEMENT_CLOCK (jitterbuffer);
+        if (clock) {
+          GST_DEBUG_OBJECT (jitterbuffer, "scheduling timeout");
+          id = gst_clock_new_single_shot_id (clock, sync_time);
+          gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
+              jitterbuffer);
+        }
+        GST_OBJECT_UNLOCK (jitterbuffer);
+      }
     }
-    /* underrun, wait for packets or flushing now */
+    /* now we wait */
     priv->waiting = TRUE;
-    JBUF_WAIT_CHECK (priv, flushing);
+    JBUF_WAIT (priv);
     priv->waiting = FALSE;
+
+    if (id) {
+      /* unschedule any pending async notifications we might have */
+      gst_clock_id_unschedule (id);
+      gst_clock_id_unref (id);
+    }
+    if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
+      goto flushing;
+
+    if (id && priv->reached_npt_stop) {
+      goto do_npt_stop;
+    }
   }
 
   /* peek a buffer, we're just looking at the timestamp and the sequence number.
@@ -1346,10 +1466,7 @@ again:
    * must be before this packet) we can wait for it until the deadline for this
    * packet expires. */
   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
-    GstClockID id;
-    GstClockTime sync_time;
     GstClockReturn ret;
-    GstClock *clock;
     GstClockTime duration = GST_CLOCK_TIME_NONE;
 
     if (gap > 0) {
@@ -1395,10 +1512,7 @@ again:
         GST_TIME_ARGS (out_time));
 
     /* prepare for sync against clock */
-    sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
-    /* add latency, this includes our own latency and the peer latency. */
-    sync_time += (priv->latency_ms * GST_MSECOND);
-    sync_time += priv->peer_latency;
+    sync_time = get_sync_time (jitterbuffer, out_time);
 
     /* create an entry for the clock */
     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
@@ -1484,6 +1598,37 @@ push_buffer:
   /* apply timestamp with offset to buffer now */
   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
 
+  /* update the elapsed time when we need to check against the npt stop time. */
+  if (priv->npt_stop != -1 && priv->ext_timestamp != -1
+      && priv->clock_base != -1) {
+    guint64 ext_time, elapsed, estimated;
+    guint32 rtp_time;
+
+    rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
+
+    ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
+    if (ext_time > priv->clock_base)
+      elapsed = ext_time - priv->clock_base;
+    else
+      elapsed = 0;
+
+    elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
+
+    if (elapsed > priv->last_elapsed) {
+      priv->last_elapsed = elapsed;
+
+      if (elapsed > 0)
+        estimated = gst_util_uint64_scale (out_time, priv->npt_stop, elapsed);
+      else
+        estimated = -1;
+
+      GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
+
+      priv->estimated_eos = estimated;
+    }
+  }
+
   /* now we are ready to push the buffer. Save the seqnum and release the lock
    * so the other end can push stuff in the queue again. */
   priv->last_popped_seqnum = seqnum;
@@ -1512,6 +1657,16 @@ do_eos:
     JBUF_UNLOCK (priv);
     return;
   }
+do_npt_stop:
+  {
+    /* store result, we are flushing now */
+    GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
+    JBUF_UNLOCK (priv);
+
+    g_signal_emit (jitterbuffer,
+        gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
+    return;
+  }
 flushing:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
index 45e6897..6d7610e 100644 (file)
@@ -69,9 +69,10 @@ struct _GstRtpJitterBufferClass
   GstElementClass parent_class;
 
   /* signals */
-  GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt);
+  GstCaps* (*request_pt_map)   (GstRtpJitterBuffer *buffer, guint pt);
 
-  void     (*handle_sync)    (GstRtpJitterBuffer *buffer, GstStructure *s);
+  void     (*handle_sync)      (GstRtpJitterBuffer *buffer, GstStructure *s);
+  void     (*on_npt_stop)      (GstRtpJitterBuffer *buffer);
 
   /* actions */
   void     (*clear_pt_map)   (GstRtpJitterBuffer *buffer);