gst/realmedia/rdtmanager.c: Include the new rdt jitterbuffer in the session manager.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 27 Aug 2008 10:02:06 +0000 (10:02 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 27 Aug 2008 10:02:06 +0000 (10:02 +0000)
Original commit message from CVS:
* gst/realmedia/rdtmanager.c: (create_session), (activate_session),
(free_session), (gst_rdt_manager_query_src),
(gst_rdt_manager_src_activate_push),
(gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt),
(gst_rdt_manager_loop), (create_recv_rtp):
Include the new rdt jitterbuffer in the session manager.

ChangeLog
gst/realmedia/rdtmanager.c

index ea517c8..bd798bf 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,14 @@
 2008-08-27  Wim Taymans  <wim.taymans@collabora.co.uk>
 
+       * gst/realmedia/rdtmanager.c: (create_session), (activate_session),
+       (free_session), (gst_rdt_manager_query_src),
+       (gst_rdt_manager_src_activate_push),
+       (gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt),
+       (gst_rdt_manager_loop), (create_recv_rtp):
+       Include the new rdt jitterbuffer in the session manager.
+
+2008-08-27  Wim Taymans  <wim.taymans@collabora.co.uk>
+
        * gst/realmedia/rdtdepay.c: (gst_rdt_depay_class_init),
        (gst_rdt_depay_finalize), (gst_rdt_depay_setcaps),
        (gst_rdt_depay_push), (gst_rdt_depay_handle_data),
index aad041f..144a0f4 100644 (file)
@@ -52,7 +52,9 @@
 
 /* #define HAVE_RTCP */
 
+#include "gstrdtbuffer.h"
 #include "rdtmanager.h"
+#include "rdtjitterbuffer.h"
 
 GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
 #define GST_CAT_DEFAULT (rdtmanager_debug)
@@ -123,6 +125,10 @@ static void gst_rdt_manager_set_property (GObject * object,
 static void gst_rdt_manager_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
+static gboolean gst_rdt_manager_query_src (GstPad * pad, GstQuery * query);
+static gboolean gst_rdt_manager_src_activate_push (GstPad * pad,
+    gboolean active);
+
 static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
 static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
     GstStateChange transition);
@@ -130,11 +136,32 @@ static GstPad *gst_rdt_manager_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name);
 static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
 
-static GstFlowReturn gst_rdt_manager_chain_rtp (GstPad * pad,
+static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
     GstBuffer * buffer);
 static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
     GstBuffer * buffer);
+static void gst_rdt_manager_loop (GstPad * pad);
+
+static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
+
+#define JBUF_LOCK(sess)   (g_mutex_lock ((sess)->jbuf_lock))
+
+#define JBUF_LOCK_CHECK(sess,label) G_STMT_START {    \
+  JBUF_LOCK (sess);                                   \
+  if (sess->srcresult != GST_FLOW_OK)                 \
+    goto label;                                       \
+} G_STMT_END
+
+#define JBUF_UNLOCK(sess) (g_mutex_unlock ((sess)->jbuf_lock))
+#define JBUF_WAIT(sess)   (g_cond_wait ((sess)->jbuf_cond, (sess)->jbuf_lock))
+
+#define JBUF_WAIT_CHECK(sess,label) G_STMT_START {    \
+  JBUF_WAIT(sess);                                    \
+  if (sess->srcresult != GST_FLOW_OK)                 \
+    goto label;                                       \
+} G_STMT_END
 
+#define JBUF_SIGNAL(sess) (g_cond_signal ((sess)->jbuf_cond))
 
 /* Manages the receiving end of the packets.
  *
@@ -152,13 +179,39 @@ struct _GstRDTManagerSession
   /* we only support one ssrc and one pt */
   guint32 ssrc;
   guint8 pt;
+  gint clock_rate;
   GstCaps *caps;
 
+  GstSegment segment;
+
+  /* the last seqnum we pushed out */
+  guint32 last_popped_seqnum;
+  /* the next expected seqnum */
+  guint32 next_seqnum;
+  /* last output time */
+  GstClockTime last_out_time;
+
   /* the pads of the session */
   GstPad *recv_rtp_sink;
   GstPad *recv_rtp_src;
   GstPad *recv_rtcp_sink;
   GstPad *rtcp_src;
+
+  GstFlowReturn srcresult;
+  gboolean blocked;
+  gboolean eos;
+  gboolean waiting;
+  gboolean discont;
+  GstClockID clock_id;
+
+  /* jitterbuffer, lock and cond */
+  RDTJitterBuffer *jbuf;
+  GMutex *jbuf_lock;
+  GCond *jbuf_cond;
+
+  /* some accounting */
+  guint64 num_late;
+  guint64 num_duplicates;
 };
 
 /* find a session with the given id */
@@ -185,19 +238,77 @@ create_session (GstRDTManager * rdtmanager, gint id)
   sess = g_new0 (GstRDTManagerSession, 1);
   sess->id = id;
   sess->dec = rdtmanager;
+  sess->jbuf = rdt_jitter_buffer_new ();
+  sess->jbuf_lock = g_mutex_new ();
+  sess->jbuf_cond = g_cond_new ();
   rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
 
   return sess;
 }
 
+static gboolean
+activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
+    guint32 ssrc, guint8 pt)
+{
+  GstPadTemplate *templ;
+  GstElementClass *klass;
+  gchar *name;
+  GstCaps *caps;
+  GValue ret = { 0 };
+  GValue args[3] = { {0}
+  , {0}
+  , {0}
+  };
+
+  GST_DEBUG_OBJECT (rdtmanager, "creating stream");
+
+  session->ssrc = ssrc;
+  session->pt = pt;
+
+  /* get pt map */
+  g_value_init (&args[0], GST_TYPE_ELEMENT);
+  g_value_set_object (&args[0], rdtmanager);
+  g_value_init (&args[1], G_TYPE_UINT);
+  g_value_set_uint (&args[1], session->id);
+  g_value_init (&args[2], G_TYPE_UINT);
+  g_value_set_uint (&args[2], pt);
+
+  g_value_init (&ret, GST_TYPE_CAPS);
+  g_value_set_boxed (&ret, NULL);
+
+  g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
+      &ret);
+
+  caps = (GstCaps *) g_value_get_boxed (&ret);
+
+  name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
+  klass = GST_ELEMENT_GET_CLASS (rdtmanager);
+  templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
+  session->recv_rtp_src = gst_pad_new_from_template (templ, name);
+  g_free (name);
+
+  gst_pad_set_caps (session->recv_rtp_src, caps);
+
+  gst_pad_set_element_private (session->recv_rtp_src, session);
+  gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
+  gst_pad_set_activatepush_function (session->recv_rtp_src,
+      gst_rdt_manager_src_activate_push);
+
+  gst_pad_set_active (session->recv_rtp_src, TRUE);
+  gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
+
+  return TRUE;
+}
+
 static void
 free_session (GstRDTManagerSession * session)
 {
+  g_object_unref (session->jbuf);
+  g_cond_free (session->jbuf_cond);
+  g_mutex_free (session->jbuf_lock);
   g_free (session);
 }
 
-static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
-
 GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT);
 
 static void
@@ -418,7 +529,8 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query)
     case GST_QUERY_LATENCY:
     {
       /* we pretend to be live with a 3 second latency */
-      gst_query_set_latency (query, TRUE, 3 * GST_SECOND, -1);
+      gst_query_set_latency (query, TRUE, 5 * GST_SECOND, -1);
+      GST_DEBUG_OBJECT (rdtmanager, "reporting 5 seconds of latency");
       res = TRUE;
       break;
     }
@@ -429,18 +541,125 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query)
   return res;
 }
 
+static gboolean
+gst_rdt_manager_src_activate_push (GstPad * pad, gboolean active)
+{
+  gboolean result = TRUE;
+  GstRDTManager *rdtmanager;
+  GstRDTManagerSession *session;
+
+  session = gst_pad_get_element_private (pad);
+  rdtmanager = session->dec;
+
+  if (active) {
+    /* allow data processing */
+    JBUF_LOCK (session);
+    GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
+    /* Mark as non flushing */
+    session->srcresult = GST_FLOW_OK;
+    gst_segment_init (&session->segment, GST_FORMAT_TIME);
+    session->last_popped_seqnum = -1;
+    session->last_out_time = -1;
+    session->next_seqnum = -1;
+    session->eos = FALSE;
+    JBUF_UNLOCK (session);
+
+    /* start pushing out buffers */
+    GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
+    gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop, pad);
+  } else {
+    /* make sure all data processing stops ASAP */
+    JBUF_LOCK (session);
+    /* mark ourselves as flushing */
+    session->srcresult = GST_FLOW_WRONG_STATE;
+    GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
+    /* this unblocks any waiting pops on the src pad task */
+    JBUF_SIGNAL (session);
+    /* unlock clock, we just unschedule, the entry will be released by
+     * the locking streaming thread. */
+    if (session->clock_id)
+      gst_clock_id_unschedule (session->clock_id);
+    JBUF_UNLOCK (session);
+
+    /* NOTE this will hardlock if the state change is called from the src pad
+     * task thread because we will _join() the thread. */
+    GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
+    result = gst_pad_stop_task (pad);
+  }
+
+  return result;
+}
+
 static GstFlowReturn
-gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer)
+gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
+    GstClockTime timestamp, GstRDTPacket * packet)
+{
+  GstRDTManager *rdtmanager;
+  guint16 seqnum;
+  gboolean tail;
+  GstFlowReturn res;
+  GstBuffer *buffer;
+
+  rdtmanager = session->dec;
+
+  res = GST_FLOW_OK;
+
+  seqnum = 0;
+  GST_DEBUG_OBJECT (rdtmanager,
+      "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (timestamp));
+
+  buffer = gst_rdt_packet_to_buffer (packet);
+
+  JBUF_LOCK_CHECK (session, out_flushing);
+
+  /* insert the packet into the queue now, FIXME, use seqnum */
+  if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
+          session->clock_rate, &tail))
+    goto duplicate;
+
+  /* signal addition of new buffer when the _loop is waiting. */
+  if (session->waiting)
+    JBUF_SIGNAL (session);
+
+finished:
+  JBUF_UNLOCK (session);
+
+  return res;
+
+  /* ERRORS */
+out_flushing:
+  {
+    res = session->srcresult;
+    GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
+duplicate:
+  {
+    GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
+        seqnum);
+    session->num_duplicates++;
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
+}
+
+static GstFlowReturn
+gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer)
 {
   GstFlowReturn res;
   GstRDTManager *rdtmanager;
   GstRDTManagerSession *session;
+  GstClockTime timestamp;
+  GstRDTPacket packet;
   guint32 ssrc;
   guint8 pt;
+  gboolean more;
 
   rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
 
-  GST_DEBUG_OBJECT (rdtmanager, "got rtp packet");
+  GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
 
   ssrc = 0;
   pt = 0;
@@ -452,61 +671,138 @@ gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer)
 
   /* see if we have the pad */
   if (!session->active) {
-    GstPadTemplate *templ;
-    GstElementClass *klass;
-    gchar *name;
-    GstCaps *caps;
-    GValue ret = { 0 };
-    GValue args[3] = { {0}
-    , {0}
-    , {0}
-    };
-
-    GST_DEBUG_OBJECT (rdtmanager, "creating stream");
-
-    session->ssrc = ssrc;
-    session->pt = pt;
-
-    /* get pt map */
-    g_value_init (&args[0], GST_TYPE_ELEMENT);
-    g_value_set_object (&args[0], rdtmanager);
-    g_value_init (&args[1], G_TYPE_UINT);
-    g_value_set_uint (&args[1], session->id);
-    g_value_init (&args[2], G_TYPE_UINT);
-    g_value_set_uint (&args[2], pt);
-
-    g_value_init (&ret, GST_TYPE_CAPS);
-    g_value_set_boxed (&ret, NULL);
-
-    g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
-        &ret);
-
-    caps = (GstCaps *) g_value_get_boxed (&ret);
-
-    name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
-    klass = GST_ELEMENT_GET_CLASS (rdtmanager);
-    templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
-    session->recv_rtp_src = gst_pad_new_from_template (templ, name);
-    g_free (name);
-
-    gst_pad_set_caps (session->recv_rtp_src, caps);
-
-    gst_pad_set_element_private (session->recv_rtp_src, session);
-    gst_pad_set_query_function (session->recv_rtp_src,
-        gst_rdt_manager_query_src);
-    gst_pad_set_active (session->recv_rtp_src, TRUE);
-    gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
-
+    activate_session (rdtmanager, session, ssrc, pt);
     session->active = TRUE;
   }
 
-  gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
+  if (GST_BUFFER_IS_DISCONT (buffer)) {
+    GST_DEBUG_OBJECT (rdtmanager, "received discont");
+    session->discont = TRUE;
+  }
+
+  res = GST_FLOW_OK;
 
-  res = gst_pad_push (session->recv_rtp_src, buffer);
+  /* take the timestamp of the buffer. This is the time when the packet was
+   * received and is used to calculate jitter and clock skew. We will adjust
+   * this timestamp with the smoothed value after processing it in the
+   * jitterbuffer. */
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  /* bring to running time */
+  timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
+      timestamp);
+
+  more = gst_rdt_buffer_get_first_packet (buffer, &packet);
+  while (more) {
+    GstRDTType type;
+
+    type = gst_rdt_packet_get_type (&packet);
+    GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
+
+    if (GST_RDT_IS_DATA_TYPE (type)) {
+      GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
+      res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
+    } else {
+      switch (type) {
+        default:
+          GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
+          break;
+      }
+    }
+    if (res != GST_FLOW_OK)
+      break;
+
+    more = gst_rdt_packet_move_to_next (&packet);
+  }
+
+  gst_buffer_unref (buffer);
 
   return res;
 }
 
+/* push packets from the queue to the downstream demuxer */
+static void
+gst_rdt_manager_loop (GstPad * pad)
+{
+  GstRDTManager *rdtmanager;
+  GstRDTManagerSession *session;
+  GstBuffer *buffer;
+  GstFlowReturn result;
+
+  rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
+
+  session = gst_pad_get_element_private (pad);
+
+  JBUF_LOCK_CHECK (session, flushing);
+  GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
+  while (TRUE) {
+    /* always wait if we are blocked */
+    if (!session->blocked) {
+      /* if we have a packet, we can exit the loop and grab it */
+      if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
+        break;
+      /* no packets but we are EOS, do eos logic */
+      if (session->eos)
+        goto do_eos;
+    }
+    /* underrun, wait for packets or flushing now */
+    session->waiting = TRUE;
+    JBUF_WAIT_CHECK (session, flushing);
+    session->waiting = FALSE;
+  }
+
+  buffer = rdt_jitter_buffer_pop (session->jbuf);
+
+  GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
+
+  if (session->discont) {
+    GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+    session->discont = FALSE;
+  }
+
+  gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
+  JBUF_UNLOCK (session);
+
+  result = gst_pad_push (session->recv_rtp_src, buffer);
+  if (result != GST_FLOW_OK)
+    goto pause;
+
+  return;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
+    gst_pad_pause_task (session->recv_rtp_src);
+    JBUF_UNLOCK (session);
+    return;
+  }
+do_eos:
+  {
+    /* store result, we are flushing now */
+    GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
+    session->srcresult = GST_FLOW_UNEXPECTED;
+    gst_pad_pause_task (session->recv_rtp_src);
+    gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
+    JBUF_UNLOCK (session);
+    return;
+  }
+pause:
+  {
+    const gchar *reason = gst_flow_get_name (result);
+
+    GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s", reason);
+
+    JBUF_LOCK (session);
+    /* store result */
+    session->srcresult = result;
+    /* we don't post errors or anything because upstream will do that for us
+     * when we pass the return value upstream. */
+    gst_pad_pause_task (session->recv_rtp_src);
+    JBUF_UNLOCK (session);
+    return;
+  }
+}
+
 static GstFlowReturn
 gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer)
 {
@@ -771,7 +1067,7 @@ create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
   session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
   gst_pad_set_element_private (session->recv_rtp_sink, session);
   gst_pad_set_chain_function (session->recv_rtp_sink,
-      gst_rdt_manager_chain_rtp);
+      gst_rdt_manager_chain_rdt);
   gst_pad_set_active (session->recv_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);