rtxqueue: add property to configure queue size
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxqueue.c
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-rtprtxqueue
25  */
26
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30
31 #include <gst/gst.h>
32 #include <gst/rtp/gstrtpbuffer.h>
33 #include <string.h>
34
35 #include "gstrtprtxqueue.h"
36
37 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
38 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
39
40 #define DEFAULT_MAX_SIZE_TIME    0
41 #define DEFAULT_MAX_SIZE_PACKETS 100
42
43 enum
44 {
45   PROP_0,
46   PROP_MAX_SIZE_TIME,
47   PROP_MAX_SIZE_PACKETS,
48   PROP_LAST
49 };
50
51 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
52     GST_PAD_SRC,
53     GST_PAD_ALWAYS,
54     GST_STATIC_CAPS ("application/x-rtp")
55     );
56
57 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
58     GST_PAD_SINK,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS ("application/x-rtp")
61     );
62
63 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
64     GstEvent * event);
65 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
66     GstBuffer * buffer);
67
68 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
69     element, GstStateChange transition);
70
71 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
72     const GValue * value, GParamSpec * pspec);
73 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
74     GValue * value, GParamSpec * pspec);
75 static void gst_rtp_rtx_queue_finalize (GObject * object);
76
77 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
78
79 static void
80 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
81 {
82   GObjectClass *gobject_class;
83   GstElementClass *gstelement_class;
84
85   gobject_class = (GObjectClass *) klass;
86   gstelement_class = (GstElementClass *) klass;
87
88   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
89   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
90   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
91
92   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
93       g_param_spec_uint ("max-size-time", "Max Size Times",
94           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
95           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96
97   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
98       g_param_spec_uint ("max-size-packets", "Max Size Packets",
99           "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
100           DEFAULT_MAX_SIZE_PACKETS,
101           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
102
103   gst_element_class_add_pad_template (gstelement_class,
104       gst_static_pad_template_get (&src_factory));
105   gst_element_class_add_pad_template (gstelement_class,
106       gst_static_pad_template_get (&sink_factory));
107
108   gst_element_class_set_static_metadata (gstelement_class,
109       "RTP Retransmission Queue", "Codec",
110       "Keep RTP packets in a queue for retransmission",
111       "Wim Taymans <wim.taymans@gmail.com>");
112
113   gstelement_class->change_state =
114       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
115 }
116
117 static void
118 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
119 {
120   g_mutex_lock (&rtx->lock);
121   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
122   g_queue_clear (rtx->queue);
123   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
124   g_list_free (rtx->pending);
125   rtx->pending = NULL;
126   g_mutex_unlock (&rtx->lock);
127 }
128
129 static void
130 gst_rtp_rtx_queue_finalize (GObject * object)
131 {
132   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
133
134   gst_rtp_rtx_queue_reset (rtx, TRUE);
135   g_queue_free (rtx->queue);
136   g_mutex_clear (&rtx->lock);
137
138   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
139 }
140
141 static void
142 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
143 {
144   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
145
146   rtx->srcpad =
147       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
148           "src"), "src");
149   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
150   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
151   gst_pad_set_event_function (rtx->srcpad,
152       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
153   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
154
155   rtx->sinkpad =
156       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
157           "sink"), "sink");
158   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
159   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
160   gst_pad_set_chain_function (rtx->sinkpad,
161       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
162   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
163
164   rtx->queue = g_queue_new ();
165   g_mutex_init (&rtx->lock);
166
167   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
168   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
169 }
170
171 typedef struct
172 {
173   GstRTPRtxQueue *rtx;
174   guint seqnum;
175   gboolean found;
176 } RTXData;
177
178 static void
179 push_seqnum (GstBuffer * buffer, RTXData * data)
180 {
181   GstRTPRtxQueue *rtx = data->rtx;
182   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
183   guint16 seqnum;
184
185   if (data->found)
186     return;
187
188   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
189     return;
190
191   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
192   gst_rtp_buffer_unmap (&rtpbuffer);
193
194   if (seqnum == data->seqnum) {
195     data->found = TRUE;
196     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
197     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
198   }
199 }
200
201 static gboolean
202 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
203 {
204   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
205   gboolean res;
206
207   switch (GST_EVENT_TYPE (event)) {
208     case GST_EVENT_CUSTOM_UPSTREAM:
209     {
210       const GstStructure *s;
211
212       s = gst_event_get_structure (event);
213       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
214         guint seqnum;
215         RTXData data;
216
217         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
218           seqnum = -1;
219
220         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
221
222         g_mutex_lock (&rtx->lock);
223         data.rtx = rtx;
224         data.seqnum = seqnum;
225         data.found = FALSE;
226         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
227         g_mutex_unlock (&rtx->lock);
228
229         gst_event_unref (event);
230         res = TRUE;
231       } else {
232         res = gst_pad_event_default (pad, parent, event);
233       }
234       break;
235     }
236     default:
237       res = gst_pad_event_default (pad, parent, event);
238       break;
239   }
240   return res;
241 }
242
243 static void
244 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
245 {
246   gst_pad_push (rtx->srcpad, buffer);
247 }
248
249 static GstFlowReturn
250 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
251 {
252   GstRTPRtxQueue *rtx;
253   GstFlowReturn ret;
254   GList *pending;
255
256   rtx = GST_RTP_RTX_QUEUE (parent);
257
258   g_mutex_lock (&rtx->lock);
259   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
260
261   if (rtx->max_size_packets) {
262     while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
263       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
264   }
265
266   pending = rtx->pending;
267   rtx->pending = NULL;
268   g_mutex_unlock (&rtx->lock);
269
270   g_list_foreach (pending, (GFunc) do_push, rtx);
271   g_list_free (pending);
272
273   ret = gst_pad_push (rtx->srcpad, buffer);
274
275   return ret;
276 }
277
278 static void
279 gst_rtp_rtx_queue_get_property (GObject * object,
280     guint prop_id, GValue * value, GParamSpec * pspec)
281 {
282   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
283
284   switch (prop_id) {
285     case PROP_MAX_SIZE_TIME:
286       g_value_set_uint (value, rtx->max_size_time);
287       break;
288     case PROP_MAX_SIZE_PACKETS:
289       g_value_set_uint (value, rtx->max_size_packets);
290       break;
291     default:
292       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
293       break;
294   }
295 }
296
297 static void
298 gst_rtp_rtx_queue_set_property (GObject * object,
299     guint prop_id, const GValue * value, GParamSpec * pspec)
300 {
301   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
302
303   switch (prop_id) {
304     case PROP_MAX_SIZE_TIME:
305       rtx->max_size_time = g_value_get_uint (value);
306       break;
307     case PROP_MAX_SIZE_PACKETS:
308       rtx->max_size_packets = g_value_get_uint (value);
309       break;
310     default:
311       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
312       break;
313   }
314 }
315
316 static GstStateChangeReturn
317 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
318 {
319   GstStateChangeReturn ret;
320   GstRTPRtxQueue *rtx;
321
322   rtx = GST_RTP_RTX_QUEUE (element);
323
324   switch (transition) {
325     default:
326       break;
327   }
328
329   ret =
330       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
331       transition);
332
333   switch (transition) {
334     case GST_STATE_CHANGE_PAUSED_TO_READY:
335       gst_rtp_rtx_queue_reset (rtx, TRUE);
336       break;
337     default:
338       break;
339   }
340
341   return ret;
342 }
343
344 gboolean
345 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
346 {
347   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
348       "rtp retransmission queue");
349
350   return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
351       GST_TYPE_RTP_RTX_QUEUE);
352 }