{
GstRtpJitterBufferPrivate *priv;
GstClockTime last_out;
- GstBuffer *head;
+ RTPJitterBufferItem *item;
priv = jbuf->priv;
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
}
- if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
+ if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
/* head buffer timestamp and offset gives our output time */
- last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
+ last_out = item->dts + priv->ts_offset;
} else {
/* use last known time when the buffer is empty */
last_out = priv->last_out_time;
}
}
+static RTPJitterBufferItem *
+alloc_item (void)
+{
+ return g_slice_new (RTPJitterBufferItem);
+}
+
+static void
+free_item (RTPJitterBufferItem * item)
+{
+ if (item->data)
+ gst_mini_object_unref (item->data);
+ g_slice_free (RTPJitterBufferItem, item);
+}
+
static GstFlowReturn
gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
guint8 pt;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
gboolean do_next_seqnum = FALSE;
+ RTPJitterBufferItem *item;
jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
- GstBuffer *old_buf;
-
- old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ RTPJitterBufferItem *old_item;
+ old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
- old_buf);
-
- gst_buffer_unref (old_buf);
+ old_item);
+ free_item (old_item);
}
}
- /* we need to make the metadata writable before pushing it in the jitterbuffer
- * because the jitterbuffer will update the PTS */
- buffer = gst_buffer_make_writable (buffer);
- GST_BUFFER_DTS (buffer) = dts;
- GST_BUFFER_PTS (buffer) = pts;
+ item = alloc_item ();
+ item->data = buffer;
+ item->next = NULL;
+ item->prev = NULL;
+ item->dts = dts;
+ item->pts = pts;
+ item->seqnum = seqnum;
+ item->rtptime = rtptime;
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
+ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
&tail, &percent)))
goto duplicate;
GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
- gst_buffer_unref (buffer);
+ free_item (item);
goto finished;
}
}
static GstClockTime
-compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
+compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
{
guint64 ext_time, elapsed;
guint32 rtp_time;
GstRtpJitterBufferPrivate *priv;
- GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
priv = jitterbuffer->priv;
- gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
- rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
- gst_rtp_buffer_unmap (&rtp);
+ rtp_time = item->rtptime;
GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
}
static void
-update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
+update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
+ RTPJitterBufferItem * item)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
&& priv->clock_base != -1 && priv->clock_rate > 0) {
guint64 elapsed, estimated;
- elapsed = compute_elapsed (jitterbuffer, outbuf);
+ elapsed = compute_elapsed (jitterbuffer, item);
if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
guint64 left;
GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
GST_TIME_ARGS (left));
- out_time = GST_BUFFER_DTS (outbuf);
+ out_time = item->dts;
if (elapsed > 0)
estimated = gst_util_uint64_scale (out_time, left, elapsed);
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result;
+ RTPJitterBufferItem *item;
GstBuffer *outbuf;
GstClockTime dts, pts;
gint percent = -1;
/* when we get here we are ready to pop and push the buffer */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
check_buffering_percent (jitterbuffer, &percent);
+ /* we need to make writable to change the flags and timestamps */
+ outbuf = gst_buffer_make_writable (item->data);
+ item->data = NULL;
+
if (G_UNLIKELY (priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
priv->ts_discont = FALSE;
}
- dts = GST_BUFFER_DTS (outbuf);
- pts = GST_BUFFER_PTS (outbuf);
-
- dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
- pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
+ dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
+ pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* update the elapsed time when we need to check against the npt stop time. */
- update_estimated_eos (jitterbuffer, outbuf);
+ update_estimated_eos (jitterbuffer, item);
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
+ free_item (item);
+
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result = GST_FLOW_OK;
- GstBuffer *outbuf;
+ RTPJitterBufferItem *item;
guint16 seqnum;
guint32 next_seqnum;
gint gap;
- GstRTPBuffer rtp = { NULL, };
/* only push buffers when PLAYING and active and not buffering */
if (priv->blocked || !priv->active ||
* If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait for a timeout or something to change.
* The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
- outbuf = rtp_jitter_buffer_peek (priv->jbuf);
- if (outbuf == NULL)
+ item = rtp_jitter_buffer_peek (priv->jbuf);
+ if (item == NULL)
goto wait;
/* get the seqnum and the next expected seqnum */
- gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
- seqnum = gst_rtp_buffer_get_seq (&rtp);
- gst_rtp_buffer_unmap (&rtp);
+ seqnum = item->seqnum;
next_seqnum = priv->next_seqnum;
/* no missing packet, pop and push */
result = pop_and_push_next (jitterbuffer, seqnum);
} else if (G_UNLIKELY (gap < 0)) {
+ RTPJitterBufferItem *item;
/* if we have a packet that we already pushed or considered dropped, pop it
* off and get the next packet */
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
- gst_buffer_unref (outbuf);
+ item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
+ free_item (item);
goto again;
} else {
/* the chain function has scheduled timers to request retransmission or
static guint64
get_buffer_level (RTPJitterBuffer * jbuf)
{
- GstBuffer *high_buf = NULL, *low_buf = NULL;
+ RTPJitterBufferItem *high_buf = NULL, *low_buf = NULL;
guint64 level;
- GList *find;
/* first first buffer with timestamp */
- find = g_queue_peek_head_link (jbuf->packets);
- while (find) {
- high_buf = find->data;
- if (GST_BUFFER_TIMESTAMP (high_buf) != -1)
+ high_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
+ while (high_buf) {
+ if (high_buf->dts != -1)
break;
- high_buf = NULL;
- find = g_list_next (find);
+ high_buf = (RTPJitterBufferItem *) g_list_next (high_buf);
}
- find = g_queue_peek_tail_link (jbuf->packets);
- while (find) {
- low_buf = find->data;
- if (GST_BUFFER_TIMESTAMP (low_buf) != -1)
+ low_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
+ while (low_buf) {
+ if (low_buf->dts != -1)
break;
- low_buf = NULL;
- find = g_list_previous (find);
+ low_buf = (RTPJitterBufferItem *) g_list_previous (low_buf);
}
if (!high_buf || !low_buf || high_buf == low_buf) {
} else {
guint64 high_ts, low_ts;
- high_ts = GST_BUFFER_TIMESTAMP (high_buf);
- low_ts = GST_BUFFER_TIMESTAMP (low_buf);
+ high_ts = high_buf->dts;
+ low_ts = low_buf->dts;
if (high_ts > low_ts)
level = high_ts - low_ts;
return out_time;
}
+static void
+queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
+{
+ GQueue *queue = jbuf->packets;
+ GList *walk;
+
+ /* It's more likely that the packet was inserted in the front of the buffer */
+ if (G_LIKELY (list)) {
+ item->prev = list->prev;
+ item->next = list;
+ list->prev = item;
+ if (item->prev) {
+ item->prev->next = item;
+ } else {
+ queue->head = item;
+ }
+ } else {
+ queue->tail = g_list_concat (queue->tail, item);
+ if (queue->tail->next)
+ queue->tail = queue->tail->next;
+ else
+ queue->head = queue->tail;
+ }
+ queue->length++;
+
+ GST_DEBUG ("head %p, tail %p", queue->head, queue->tail);
+ for (walk = queue->head; walk; walk = walk->next) {
+ RTPJitterBufferItem *item = (RTPJitterBufferItem *) walk;
+ GST_DEBUG ("item %p, next %p, prev %p, #%d",
+ item, item->next, item->prev, item->seqnum);
+ }
+}
+
/**
* rtp_jitter_buffer_insert:
* @jbuf: an #RTPJitterBuffer
- * @buf: a buffer
- * @time: a running_time when this buffer was received in nanoseconds
- * @max_delay: the maximum lateness of @buf
+ * @item: an #RTPJitterBufferItem to insert
* @tail: TRUE when the tail element changed.
+ * @percent: the buffering percent after insertion
*
- * Inserts @buf into the packet queue of @jbuf. The sequence number of the
+ * Inserts @item into the packet queue of @jbuf. The sequence number of the
* packet will be used to sort the packets. This function takes ownerhip of
* @buf when the function returns %TRUE.
- * @buf should have writable metadata when calling this function.
*
* Returns: %FALSE if a packet with the same number already existed.
*/
gboolean
-rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
- GstClockTime time, gboolean * tail, gint * percent)
+rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
+ gboolean * tail, gint * percent)
{
GList *list;
guint32 rtptime;
guint16 seqnum;
- GstRTPBuffer rtp = { NULL };
+ GstClockTime dts;
g_return_val_if_fail (jbuf != NULL, FALSE);
- g_return_val_if_fail (buf != NULL, FALSE);
-
- gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
+ g_return_val_if_fail (item != NULL, FALSE);
- seqnum = gst_rtp_buffer_get_seq (&rtp);
+ seqnum = item->seqnum;
/* loop the list to skip strictly smaller seqnum buffers */
for (list = jbuf->packets->head; list; list = g_list_next (list)) {
guint16 qseq;
gint gap;
- GstRTPBuffer rtpb = { NULL };
+ RTPJitterBufferItem *qitem = (RTPJitterBufferItem *) list;
- gst_rtp_buffer_map (GST_BUFFER_CAST (list->data), GST_MAP_READ, &rtpb);
- qseq = gst_rtp_buffer_get_seq (&rtpb);
- gst_rtp_buffer_unmap (&rtpb);
+ qseq = qitem->seqnum;
/* compare the new seqnum to the one in the buffer */
gap = gst_rtp_buffer_compare_seqnum (seqnum, qseq);
break;
}
- rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ dts = item->dts;
+ rtptime = item->rtptime;
+
/* rtp time jumps are checked for during skew calculation, but bypassed
* in other mode, so mind those here and reset jb if needed.
* Only reset if valid input time, which is likely for UDP input
* where we expect this might happen due to async thread effects
* (in seek and state change cycles), but not so much for TCP input */
- if (GST_CLOCK_TIME_IS_VALID (time) &&
+ if (GST_CLOCK_TIME_IS_VALID (dts) &&
jbuf->mode != RTP_JITTER_BUFFER_MODE_SLAVE &&
jbuf->base_time != -1 && jbuf->last_rtptime != -1) {
GstClockTime ext_rtptime = jbuf->ext_rtptime;
* mode we will adjust the outgoing timestamps according to the amount of
* time we spent buffering. */
if (jbuf->base_time == -1)
- time = 0;
+ dts = 0;
else
- time = -1;
+ dts = -1;
break;
case RTP_JITTER_BUFFER_MODE_SLAVE:
default:
break;
}
/* do skew calculation by measuring the difference between rtptime and the
- * receive time, this function will retimestamp @buf with the skew corrected
- * running time. */
- time = calculate_skew (jbuf, rtptime, time);
- GST_BUFFER_PTS (buf) = time;
+ * receive dts, this function will return the skew corrected rtptime. */
+ item->pts = calculate_skew (jbuf, rtptime, dts);
- /* It's more likely that the packet was inserted in the front of the buffer */
- if (G_LIKELY (list))
- g_queue_insert_before (jbuf->packets, list, buf);
- else
- g_queue_push_tail (jbuf->packets, buf);
+ queue_do_insert (jbuf, list, (GList *) item);
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
if (G_LIKELY (tail))
*tail = (list == NULL);
- gst_rtp_buffer_unmap (&rtp);
-
return TRUE;
/* ERRORS */
duplicate:
{
- gst_rtp_buffer_unmap (&rtp);
GST_WARNING ("duplicate packet %d found", (gint) seqnum);
return FALSE;
}
*
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
-GstBuffer *
+RTPJitterBufferItem *
rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
{
- GstBuffer *buf;
+ GList *item = NULL;
+ GQueue *queue;
g_return_val_if_fail (jbuf != NULL, NULL);
- buf = g_queue_pop_tail (jbuf->packets);
+ queue = jbuf->packets;
+
+ item = queue->tail;
+ if (item) {
+ queue->tail = item->prev;
+ if (queue->tail)
+ queue->tail->next = NULL;
+ else
+ queue->head = NULL;
+ queue->length--;
+ }
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
else if (percent)
*percent = -1;
- return buf;
+ return (RTPJitterBufferItem *) item;
}
/**
*
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
-GstBuffer *
+RTPJitterBufferItem *
rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf)
{
- GstBuffer *buf;
-
g_return_val_if_fail (jbuf != NULL, NULL);
- buf = g_queue_peek_tail (jbuf->packets);
-
- return buf;
+ return (RTPJitterBufferItem *) jbuf->packets->tail;
}
/**
typedef struct _RTPJitterBuffer RTPJitterBuffer;
typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
+typedef struct _RTPJitterBufferItem RTPJitterBufferItem;
#define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type())
#define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer))
GObjectClass parent_class;
};
+/**
+ * RTPJitterBufferItem:
+ * @data: the data of the item
+ * @next: pointer to next item
+ * @prev: pointer to previous item
+ * @dts: input DTS
+ * @pts: output PTS
+ * @seqnum: seqnum
+ * @rtptime: rtp timestamp
+ *
+ * An object containing an RTP packet or event.
+ */
+struct _RTPJitterBufferItem {
+ gpointer data;
+ GList *next;
+ GList *prev;
+ GstClockTime dts;
+ GstClockTime pts;
+ guint seqnum;
+ guint rtptime;
+};
+
GType rtp_jitter_buffer_get_type (void);
/* managing lifetime */
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
-gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
- GstClockTime time,
+gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
+ RTPJitterBufferItem *item,
gboolean *tail, gint *percent);
-GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
-GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
+RTPJitterBufferItem * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
+RTPJitterBufferItem * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);