}
static void
+gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
+{
+ g_mutex_lock (&rtx->lock);
+ g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (rtx->queue);
+ g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
+ g_list_free (rtx->pending);
+ rtx->pending = NULL;
+ g_mutex_unlock (&rtx->lock);
+}
+
+static void
gst_rtp_rtx_queue_finalize (GObject * object)
{
GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
+ gst_rtp_rtx_queue_reset (rtx, TRUE);
g_queue_free (rtx->queue);
+ g_mutex_clear (&rtx->lock);
G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
}
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
-
rtx->queue = g_queue_new ();
+ g_mutex_init (&rtx->lock);
}
typedef struct
{
GstRTPRtxQueue *rtx;
guint seqnum;
+ gboolean found;
} RTXData;
static void
GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
guint16 seqnum;
+ if (data->found)
+ return;
+
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
return;
gst_rtp_buffer_unmap (&rtpbuffer);
if (seqnum == data->seqnum) {
- gst_pad_push (rtx->srcpad, gst_buffer_ref (buffer));
+ data->found = TRUE;
+ GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
+ rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
}
}
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
seqnum = -1;
+ GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
+
+ g_mutex_lock (&rtx->lock);
data.rtx = rtx;
data.seqnum = seqnum;
+ data.found = FALSE;
g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
+ g_mutex_unlock (&rtx->lock);
+
gst_event_unref (event);
res = TRUE;
} else {
return res;
}
+static void
+do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
+{
+ gst_pad_push (rtx->srcpad, buffer);
+}
+
static GstFlowReturn
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstRTPRtxQueue *rtx;
GstFlowReturn ret;
+ GList *pending;
rtx = GST_RTP_RTX_QUEUE (parent);
+ g_mutex_lock (&rtx->lock);
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
while (g_queue_get_length (rtx->queue) > 100) {
gst_buffer_unref (g_queue_pop_tail (rtx->queue));
}
+ pending = rtx->pending;
+ rtx->pending = NULL;
+ g_mutex_unlock (&rtx->lock);
+
+ g_list_foreach (pending, (GFunc) do_push, rtx);
+ g_list_free (pending);
ret = gst_pad_push (rtx->srcpad, buffer);
gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
+ GstRTPRtxQueue *rtx;
+
+ rtx = GST_RTP_RTX_QUEUE (element);
switch (transition) {
default:
transition);
switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rtp_rtx_queue_reset (rtx, TRUE);
+ break;
default:
break;
}