/* #define HAVE_RTCP */
+#include "gstrdtbuffer.h"
#include "rdtmanager.h"
+#include "rdtjitterbuffer.h"
GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
#define GST_CAT_DEFAULT (rdtmanager_debug)
static void gst_rdt_manager_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
+static gboolean gst_rdt_manager_query_src (GstPad * pad, GstQuery * query);
+static gboolean gst_rdt_manager_src_activate_push (GstPad * pad,
+ gboolean active);
+
static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
GstStateChange transition);
GstPadTemplate * templ, const gchar * name);
static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
-static GstFlowReturn gst_rdt_manager_chain_rtp (GstPad * pad,
+static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
GstBuffer * buffer);
static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
GstBuffer * buffer);
+static void gst_rdt_manager_loop (GstPad * pad);
+
+static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
+
+#define JBUF_LOCK(sess) (g_mutex_lock ((sess)->jbuf_lock))
+
+#define JBUF_LOCK_CHECK(sess,label) G_STMT_START { \
+ JBUF_LOCK (sess); \
+ if (sess->srcresult != GST_FLOW_OK) \
+ goto label; \
+} G_STMT_END
+
+#define JBUF_UNLOCK(sess) (g_mutex_unlock ((sess)->jbuf_lock))
+#define JBUF_WAIT(sess) (g_cond_wait ((sess)->jbuf_cond, (sess)->jbuf_lock))
+
+#define JBUF_WAIT_CHECK(sess,label) G_STMT_START { \
+ JBUF_WAIT(sess); \
+ if (sess->srcresult != GST_FLOW_OK) \
+ goto label; \
+} G_STMT_END
+#define JBUF_SIGNAL(sess) (g_cond_signal ((sess)->jbuf_cond))
/* Manages the receiving end of the packets.
*
/* we only support one ssrc and one pt */
guint32 ssrc;
guint8 pt;
+ gint clock_rate;
GstCaps *caps;
+ GstSegment segment;
+
+ /* the last seqnum we pushed out */
+ guint32 last_popped_seqnum;
+ /* the next expected seqnum */
+ guint32 next_seqnum;
+ /* last output time */
+ GstClockTime last_out_time;
+
/* the pads of the session */
GstPad *recv_rtp_sink;
GstPad *recv_rtp_src;
GstPad *recv_rtcp_sink;
GstPad *rtcp_src;
+
+ GstFlowReturn srcresult;
+ gboolean blocked;
+ gboolean eos;
+ gboolean waiting;
+ gboolean discont;
+ GstClockID clock_id;
+
+ /* jitterbuffer, lock and cond */
+ RDTJitterBuffer *jbuf;
+ GMutex *jbuf_lock;
+ GCond *jbuf_cond;
+
+ /* some accounting */
+ guint64 num_late;
+ guint64 num_duplicates;
};
/* find a session with the given id */
sess = g_new0 (GstRDTManagerSession, 1);
sess->id = id;
sess->dec = rdtmanager;
+ sess->jbuf = rdt_jitter_buffer_new ();
+ sess->jbuf_lock = g_mutex_new ();
+ sess->jbuf_cond = g_cond_new ();
rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
return sess;
}
+static gboolean
+activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
+ guint32 ssrc, guint8 pt)
+{
+ GstPadTemplate *templ;
+ GstElementClass *klass;
+ gchar *name;
+ GstCaps *caps;
+ GValue ret = { 0 };
+ GValue args[3] = { {0}
+ , {0}
+ , {0}
+ };
+
+ GST_DEBUG_OBJECT (rdtmanager, "creating stream");
+
+ session->ssrc = ssrc;
+ session->pt = pt;
+
+ /* get pt map */
+ g_value_init (&args[0], GST_TYPE_ELEMENT);
+ g_value_set_object (&args[0], rdtmanager);
+ g_value_init (&args[1], G_TYPE_UINT);
+ g_value_set_uint (&args[1], session->id);
+ g_value_init (&args[2], G_TYPE_UINT);
+ g_value_set_uint (&args[2], pt);
+
+ g_value_init (&ret, GST_TYPE_CAPS);
+ g_value_set_boxed (&ret, NULL);
+
+ g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
+ &ret);
+
+ caps = (GstCaps *) g_value_get_boxed (&ret);
+
+ name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
+ klass = GST_ELEMENT_GET_CLASS (rdtmanager);
+ templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
+ session->recv_rtp_src = gst_pad_new_from_template (templ, name);
+ g_free (name);
+
+ gst_pad_set_caps (session->recv_rtp_src, caps);
+
+ gst_pad_set_element_private (session->recv_rtp_src, session);
+ gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
+ gst_pad_set_activatepush_function (session->recv_rtp_src,
+ gst_rdt_manager_src_activate_push);
+
+ gst_pad_set_active (session->recv_rtp_src, TRUE);
+ gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
+
+ return TRUE;
+}
+
static void
free_session (GstRDTManagerSession * session)
{
+ g_object_unref (session->jbuf);
+ g_cond_free (session->jbuf_cond);
+ g_mutex_free (session->jbuf_lock);
g_free (session);
}
-static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
-
GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT);
static void
case GST_QUERY_LATENCY:
{
/* we pretend to be live with a 3 second latency */
- gst_query_set_latency (query, TRUE, 3 * GST_SECOND, -1);
+ gst_query_set_latency (query, TRUE, 5 * GST_SECOND, -1);
+ GST_DEBUG_OBJECT (rdtmanager, "reporting 5 seconds of latency");
res = TRUE;
break;
}
return res;
}
+static gboolean
+gst_rdt_manager_src_activate_push (GstPad * pad, gboolean active)
+{
+ gboolean result = TRUE;
+ GstRDTManager *rdtmanager;
+ GstRDTManagerSession *session;
+
+ session = gst_pad_get_element_private (pad);
+ rdtmanager = session->dec;
+
+ if (active) {
+ /* allow data processing */
+ JBUF_LOCK (session);
+ GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
+ /* Mark as non flushing */
+ session->srcresult = GST_FLOW_OK;
+ gst_segment_init (&session->segment, GST_FORMAT_TIME);
+ session->last_popped_seqnum = -1;
+ session->last_out_time = -1;
+ session->next_seqnum = -1;
+ session->eos = FALSE;
+ JBUF_UNLOCK (session);
+
+ /* start pushing out buffers */
+ GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
+ gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop, pad);
+ } else {
+ /* make sure all data processing stops ASAP */
+ JBUF_LOCK (session);
+ /* mark ourselves as flushing */
+ session->srcresult = GST_FLOW_WRONG_STATE;
+ GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
+ /* this unblocks any waiting pops on the src pad task */
+ JBUF_SIGNAL (session);
+ /* unlock clock, we just unschedule, the entry will be released by
+ * the locking streaming thread. */
+ if (session->clock_id)
+ gst_clock_id_unschedule (session->clock_id);
+ JBUF_UNLOCK (session);
+
+ /* NOTE this will hardlock if the state change is called from the src pad
+ * task thread because we will _join() the thread. */
+ GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
+ result = gst_pad_stop_task (pad);
+ }
+
+ return result;
+}
+
static GstFlowReturn
-gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer)
+gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
+ GstClockTime timestamp, GstRDTPacket * packet)
+{
+ GstRDTManager *rdtmanager;
+ guint16 seqnum;
+ gboolean tail;
+ GstFlowReturn res;
+ GstBuffer *buffer;
+
+ rdtmanager = session->dec;
+
+ res = GST_FLOW_OK;
+
+ seqnum = 0;
+ GST_DEBUG_OBJECT (rdtmanager,
+ "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (timestamp));
+
+ buffer = gst_rdt_packet_to_buffer (packet);
+
+ JBUF_LOCK_CHECK (session, out_flushing);
+
+ /* insert the packet into the queue now, FIXME, use seqnum */
+ if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
+ session->clock_rate, &tail))
+ goto duplicate;
+
+ /* signal addition of new buffer when the _loop is waiting. */
+ if (session->waiting)
+ JBUF_SIGNAL (session);
+
+finished:
+ JBUF_UNLOCK (session);
+
+ return res;
+
+ /* ERRORS */
+out_flushing:
+ {
+ res = session->srcresult;
+ GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
+duplicate:
+ {
+ GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
+ seqnum);
+ session->num_duplicates++;
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
+}
+
+static GstFlowReturn
+gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer)
{
GstFlowReturn res;
GstRDTManager *rdtmanager;
GstRDTManagerSession *session;
+ GstClockTime timestamp;
+ GstRDTPacket packet;
guint32 ssrc;
guint8 pt;
+ gboolean more;
rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
- GST_DEBUG_OBJECT (rdtmanager, "got rtp packet");
+ GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
ssrc = 0;
pt = 0;
/* see if we have the pad */
if (!session->active) {
- GstPadTemplate *templ;
- GstElementClass *klass;
- gchar *name;
- GstCaps *caps;
- GValue ret = { 0 };
- GValue args[3] = { {0}
- , {0}
- , {0}
- };
-
- GST_DEBUG_OBJECT (rdtmanager, "creating stream");
-
- session->ssrc = ssrc;
- session->pt = pt;
-
- /* get pt map */
- g_value_init (&args[0], GST_TYPE_ELEMENT);
- g_value_set_object (&args[0], rdtmanager);
- g_value_init (&args[1], G_TYPE_UINT);
- g_value_set_uint (&args[1], session->id);
- g_value_init (&args[2], G_TYPE_UINT);
- g_value_set_uint (&args[2], pt);
-
- g_value_init (&ret, GST_TYPE_CAPS);
- g_value_set_boxed (&ret, NULL);
-
- g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
- &ret);
-
- caps = (GstCaps *) g_value_get_boxed (&ret);
-
- name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
- klass = GST_ELEMENT_GET_CLASS (rdtmanager);
- templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
- session->recv_rtp_src = gst_pad_new_from_template (templ, name);
- g_free (name);
-
- gst_pad_set_caps (session->recv_rtp_src, caps);
-
- gst_pad_set_element_private (session->recv_rtp_src, session);
- gst_pad_set_query_function (session->recv_rtp_src,
- gst_rdt_manager_query_src);
- gst_pad_set_active (session->recv_rtp_src, TRUE);
- gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
-
+ activate_session (rdtmanager, session, ssrc, pt);
session->active = TRUE;
}
- gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
+ if (GST_BUFFER_IS_DISCONT (buffer)) {
+ GST_DEBUG_OBJECT (rdtmanager, "received discont");
+ session->discont = TRUE;
+ }
+
+ res = GST_FLOW_OK;
- res = gst_pad_push (session->recv_rtp_src, buffer);
+ /* take the timestamp of the buffer. This is the time when the packet was
+ * received and is used to calculate jitter and clock skew. We will adjust
+ * this timestamp with the smoothed value after processing it in the
+ * jitterbuffer. */
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ /* bring to running time */
+ timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
+ timestamp);
+
+ more = gst_rdt_buffer_get_first_packet (buffer, &packet);
+ while (more) {
+ GstRDTType type;
+
+ type = gst_rdt_packet_get_type (&packet);
+ GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
+
+ if (GST_RDT_IS_DATA_TYPE (type)) {
+ GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
+ res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
+ } else {
+ switch (type) {
+ default:
+ GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
+ break;
+ }
+ }
+ if (res != GST_FLOW_OK)
+ break;
+
+ more = gst_rdt_packet_move_to_next (&packet);
+ }
+
+ gst_buffer_unref (buffer);
return res;
}
+/* push packets from the queue to the downstream demuxer */
+static void
+gst_rdt_manager_loop (GstPad * pad)
+{
+ GstRDTManager *rdtmanager;
+ GstRDTManagerSession *session;
+ GstBuffer *buffer;
+ GstFlowReturn result;
+
+ rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
+
+ session = gst_pad_get_element_private (pad);
+
+ JBUF_LOCK_CHECK (session, flushing);
+ GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
+ while (TRUE) {
+ /* always wait if we are blocked */
+ if (!session->blocked) {
+ /* if we have a packet, we can exit the loop and grab it */
+ if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
+ break;
+ /* no packets but we are EOS, do eos logic */
+ if (session->eos)
+ goto do_eos;
+ }
+ /* underrun, wait for packets or flushing now */
+ session->waiting = TRUE;
+ JBUF_WAIT_CHECK (session, flushing);
+ session->waiting = FALSE;
+ }
+
+ buffer = rdt_jitter_buffer_pop (session->jbuf);
+
+ GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
+
+ if (session->discont) {
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ session->discont = FALSE;
+ }
+
+ gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
+ JBUF_UNLOCK (session);
+
+ result = gst_pad_push (session->recv_rtp_src, buffer);
+ if (result != GST_FLOW_OK)
+ goto pause;
+
+ return;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
+ gst_pad_pause_task (session->recv_rtp_src);
+ JBUF_UNLOCK (session);
+ return;
+ }
+do_eos:
+ {
+ /* store result, we are flushing now */
+ GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
+ session->srcresult = GST_FLOW_UNEXPECTED;
+ gst_pad_pause_task (session->recv_rtp_src);
+ gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
+ JBUF_UNLOCK (session);
+ return;
+ }
+pause:
+ {
+ const gchar *reason = gst_flow_get_name (result);
+
+ GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s", reason);
+
+ JBUF_LOCK (session);
+ /* store result */
+ session->srcresult = result;
+ /* we don't post errors or anything because upstream will do that for us
+ * when we pass the return value upstream. */
+ gst_pad_pause_task (session->recv_rtp_src);
+ JBUF_UNLOCK (session);
+ return;
+ }
+}
+
static GstFlowReturn
gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer)
{
session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
gst_pad_set_element_private (session->recv_rtp_sink, session);
gst_pad_set_chain_function (session->recv_rtp_sink,
- gst_rdt_manager_chain_rtp);
+ gst_rdt_manager_chain_rdt);
gst_pad_set_active (session->recv_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);