static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
+static gboolean gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
"sink"), "sink");
GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
+ gst_pad_set_event_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_sink_event));
gst_pad_set_chain_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
gst_pad_set_chain_list_function (rtx->sinkpad,
if (data->found)
return;
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
+ if (!GST_IS_BUFFER (buffer) ||
+ !gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
return;
seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
return res;
}
+static gboolean
+gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
+ gboolean res;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ {
+ g_mutex_lock (&rtx->lock);
+ gst_event_copy_segment (event, &rtx->head_segment);
+ g_queue_push_head (rtx->queue, gst_event_ref (event));
+ g_mutex_unlock (&rtx->lock);
+ /* fall through */
+ }
+ default:
+ res = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ return res;
+}
+
static void
do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
{
gst_pad_push (rtx->srcpad, buffer);
}
+static guint32
+get_ts_diff (GstRTPRtxQueue * rtx)
+{
+ GstClockTime high_ts, low_ts;
+ GstClockTimeDiff result;
+ GstBuffer *high_buf, *low_buf;
+
+ high_buf = g_queue_peek_head (rtx->queue);
+
+ while (GST_IS_EVENT ((low_buf = g_queue_peek_tail (rtx->queue)))) {
+ GstEvent *event = g_queue_pop_tail (rtx->queue);
+ gst_event_copy_segment (event, &rtx->tail_segment);
+ gst_event_unref (event);
+ }
+
+ if (!high_buf || !low_buf || high_buf == low_buf)
+ return 0;
+
+ high_ts = GST_BUFFER_TIMESTAMP (high_buf);
+ low_ts = GST_BUFFER_TIMESTAMP (low_buf);
+
+ high_ts = gst_segment_to_running_time (&rtx->head_segment, GST_FORMAT_TIME,
+ high_ts);
+ low_ts = gst_segment_to_running_time (&rtx->tail_segment, GST_FORMAT_TIME,
+ low_ts);
+
+ result = high_ts - low_ts;
+
+ /* return value in ms instead of ns */
+ return (guint32) gst_util_uint64_scale_int (result, 1, GST_MSECOND);
+}
+
/* Must be called with rtx->lock */
static void
shrink_queue (GstRTPRtxQueue * rtx)
while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
gst_buffer_unref (g_queue_pop_tail (rtx->queue));
}
+ if (rtx->max_size_time) {
+ while (get_ts_diff (rtx) > rtx->max_size_time)
+ gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+ }
}
static GstFlowReturn