1 /* RTP Retransmission queue element for GStreamer
5 * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
24 * SECTION:element-rtprtxqueue
32 #include <gst/rtp/gstrtpbuffer.h>
35 #include "gstrtprtxqueue.h"
37 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
38 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
40 #define DEFAULT_MAX_SIZE_TIME 0
41 #define DEFAULT_MAX_SIZE_PACKETS 100
50 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
53 GST_STATIC_CAPS ("application/x-rtp")
56 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
59 GST_STATIC_CAPS ("application/x-rtp")
62 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
64 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
66 static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
67 GstObject * parent, GstBufferList * list);
69 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
70 element, GstStateChange transition);
72 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
73 const GValue * value, GParamSpec * pspec);
74 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
75 GValue * value, GParamSpec * pspec);
76 static void gst_rtp_rtx_queue_finalize (GObject * object);
78 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
81 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
83 GObjectClass *gobject_class;
84 GstElementClass *gstelement_class;
86 gobject_class = (GObjectClass *) klass;
87 gstelement_class = (GstElementClass *) klass;
89 gobject_class->get_property = gst_rtp_rtx_queue_get_property;
90 gobject_class->set_property = gst_rtp_rtx_queue_set_property;
91 gobject_class->finalize = gst_rtp_rtx_queue_finalize;
93 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
94 g_param_spec_uint ("max-size-time", "Max Size Times",
95 "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
96 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
99 g_param_spec_uint ("max-size-packets", "Max Size Packets",
100 "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
101 DEFAULT_MAX_SIZE_PACKETS,
102 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104 gst_element_class_add_pad_template (gstelement_class,
105 gst_static_pad_template_get (&src_factory));
106 gst_element_class_add_pad_template (gstelement_class,
107 gst_static_pad_template_get (&sink_factory));
109 gst_element_class_set_static_metadata (gstelement_class,
110 "RTP Retransmission Queue", "Codec",
111 "Keep RTP packets in a queue for retransmission",
112 "Wim Taymans <wim.taymans@gmail.com>");
114 gstelement_class->change_state =
115 GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
119 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
121 g_mutex_lock (&rtx->lock);
122 g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
123 g_queue_clear (rtx->queue);
124 g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
125 g_list_free (rtx->pending);
127 g_mutex_unlock (&rtx->lock);
131 gst_rtp_rtx_queue_finalize (GObject * object)
133 GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
135 gst_rtp_rtx_queue_reset (rtx, TRUE);
136 g_queue_free (rtx->queue);
137 g_mutex_clear (&rtx->lock);
139 G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
143 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
145 GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
148 gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
150 GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
151 GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
152 gst_pad_set_event_function (rtx->srcpad,
153 GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
154 gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
157 gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
159 GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
160 GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
161 gst_pad_set_chain_function (rtx->sinkpad,
162 GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
163 gst_pad_set_chain_list_function (rtx->sinkpad,
164 GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
165 gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
167 rtx->queue = g_queue_new ();
168 g_mutex_init (&rtx->lock);
170 rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
171 rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
182 push_seqnum (GstBuffer * buffer, RTXData * data)
184 GstRTPRtxQueue *rtx = data->rtx;
185 GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
191 if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
194 seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
195 gst_rtp_buffer_unmap (&rtpbuffer);
197 if (seqnum == data->seqnum) {
199 GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
200 rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
205 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
207 GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
210 switch (GST_EVENT_TYPE (event)) {
211 case GST_EVENT_CUSTOM_UPSTREAM:
213 const GstStructure *s;
215 s = gst_event_get_structure (event);
216 if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
220 if (!gst_structure_get_uint (s, "seqnum", &seqnum))
223 GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
225 g_mutex_lock (&rtx->lock);
227 data.seqnum = seqnum;
229 g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
230 g_mutex_unlock (&rtx->lock);
232 gst_event_unref (event);
235 res = gst_pad_event_default (pad, parent, event);
240 res = gst_pad_event_default (pad, parent, event);
247 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
249 gst_pad_push (rtx->srcpad, buffer);
252 /* Must be called with rtx->lock */
254 shrink_queue (GstRTPRtxQueue * rtx)
256 if (rtx->max_size_packets) {
257 while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
258 gst_buffer_unref (g_queue_pop_tail (rtx->queue));
263 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
269 rtx = GST_RTP_RTX_QUEUE (parent);
271 g_mutex_lock (&rtx->lock);
272 g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
275 pending = rtx->pending;
277 g_mutex_unlock (&rtx->lock);
279 pending = g_list_reverse (pending);
280 g_list_foreach (pending, (GFunc) do_push, rtx);
281 g_list_free (pending);
283 ret = gst_pad_push (rtx->srcpad, buffer);
289 push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
291 GQueue *queue = user_data;
293 g_queue_push_head (queue, gst_buffer_ref (*buffer));
299 gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
300 GstBufferList * list)
306 rtx = GST_RTP_RTX_QUEUE (parent);
308 g_mutex_lock (&rtx->lock);
309 gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
312 pending = rtx->pending;
314 g_mutex_unlock (&rtx->lock);
316 pending = g_list_reverse (pending);
317 g_list_foreach (pending, (GFunc) do_push, rtx);
318 g_list_free (pending);
320 ret = gst_pad_push_list (rtx->srcpad, list);
326 gst_rtp_rtx_queue_get_property (GObject * object,
327 guint prop_id, GValue * value, GParamSpec * pspec)
329 GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
332 case PROP_MAX_SIZE_TIME:
333 g_value_set_uint (value, rtx->max_size_time);
335 case PROP_MAX_SIZE_PACKETS:
336 g_value_set_uint (value, rtx->max_size_packets);
339 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
345 gst_rtp_rtx_queue_set_property (GObject * object,
346 guint prop_id, const GValue * value, GParamSpec * pspec)
348 GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
351 case PROP_MAX_SIZE_TIME:
352 rtx->max_size_time = g_value_get_uint (value);
354 case PROP_MAX_SIZE_PACKETS:
355 rtx->max_size_packets = g_value_get_uint (value);
358 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
363 static GstStateChangeReturn
364 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
366 GstStateChangeReturn ret;
369 rtx = GST_RTP_RTX_QUEUE (element);
371 switch (transition) {
377 GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
380 switch (transition) {
381 case GST_STATE_CHANGE_PAUSED_TO_READY:
382 gst_rtp_rtx_queue_reset (rtx, TRUE);
392 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
394 GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
395 "rtp retransmission queue");
397 return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
398 GST_TYPE_RTP_RTX_QUEUE);