rtprtxqueue: reverse pending list before pushing buffers
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxqueue.c
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-rtprtxqueue
25  */
26
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30
31 #include <gst/gst.h>
32 #include <gst/rtp/gstrtpbuffer.h>
33 #include <string.h>
34
35 #include "gstrtprtxqueue.h"
36
37 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
38 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
39
40 #define DEFAULT_MAX_SIZE_TIME    0
41 #define DEFAULT_MAX_SIZE_PACKETS 100
42
43 enum
44 {
45   PROP_0,
46   PROP_MAX_SIZE_TIME,
47   PROP_MAX_SIZE_PACKETS
48 };
49
50 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
51     GST_PAD_SRC,
52     GST_PAD_ALWAYS,
53     GST_STATIC_CAPS ("application/x-rtp")
54     );
55
56 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS ("application/x-rtp")
60     );
61
62 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
63     GstEvent * event);
64 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
65     GstBuffer * buffer);
66 static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
67     GstObject * parent, GstBufferList * list);
68
69 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
70     element, GstStateChange transition);
71
72 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
73     const GValue * value, GParamSpec * pspec);
74 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
75     GValue * value, GParamSpec * pspec);
76 static void gst_rtp_rtx_queue_finalize (GObject * object);
77
78 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
79
80 static void
81 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
82 {
83   GObjectClass *gobject_class;
84   GstElementClass *gstelement_class;
85
86   gobject_class = (GObjectClass *) klass;
87   gstelement_class = (GstElementClass *) klass;
88
89   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
90   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
91   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
92
93   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
94       g_param_spec_uint ("max-size-time", "Max Size Times",
95           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
96           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
97
98   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
99       g_param_spec_uint ("max-size-packets", "Max Size Packets",
100           "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
101           DEFAULT_MAX_SIZE_PACKETS,
102           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
103
104   gst_element_class_add_pad_template (gstelement_class,
105       gst_static_pad_template_get (&src_factory));
106   gst_element_class_add_pad_template (gstelement_class,
107       gst_static_pad_template_get (&sink_factory));
108
109   gst_element_class_set_static_metadata (gstelement_class,
110       "RTP Retransmission Queue", "Codec",
111       "Keep RTP packets in a queue for retransmission",
112       "Wim Taymans <wim.taymans@gmail.com>");
113
114   gstelement_class->change_state =
115       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
116 }
117
118 static void
119 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
120 {
121   g_mutex_lock (&rtx->lock);
122   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
123   g_queue_clear (rtx->queue);
124   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
125   g_list_free (rtx->pending);
126   rtx->pending = NULL;
127   g_mutex_unlock (&rtx->lock);
128 }
129
130 static void
131 gst_rtp_rtx_queue_finalize (GObject * object)
132 {
133   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
134
135   gst_rtp_rtx_queue_reset (rtx, TRUE);
136   g_queue_free (rtx->queue);
137   g_mutex_clear (&rtx->lock);
138
139   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
140 }
141
142 static void
143 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
144 {
145   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
146
147   rtx->srcpad =
148       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
149           "src"), "src");
150   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
151   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
152   gst_pad_set_event_function (rtx->srcpad,
153       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
154   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
155
156   rtx->sinkpad =
157       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
158           "sink"), "sink");
159   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
160   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
161   gst_pad_set_chain_function (rtx->sinkpad,
162       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
163   gst_pad_set_chain_list_function (rtx->sinkpad,
164       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
165   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
166
167   rtx->queue = g_queue_new ();
168   g_mutex_init (&rtx->lock);
169
170   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
171   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
172 }
173
174 typedef struct
175 {
176   GstRTPRtxQueue *rtx;
177   guint seqnum;
178   gboolean found;
179 } RTXData;
180
181 static void
182 push_seqnum (GstBuffer * buffer, RTXData * data)
183 {
184   GstRTPRtxQueue *rtx = data->rtx;
185   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
186   guint16 seqnum;
187
188   if (data->found)
189     return;
190
191   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
192     return;
193
194   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
195   gst_rtp_buffer_unmap (&rtpbuffer);
196
197   if (seqnum == data->seqnum) {
198     data->found = TRUE;
199     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
200     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
201   }
202 }
203
204 static gboolean
205 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
206 {
207   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
208   gboolean res;
209
210   switch (GST_EVENT_TYPE (event)) {
211     case GST_EVENT_CUSTOM_UPSTREAM:
212     {
213       const GstStructure *s;
214
215       s = gst_event_get_structure (event);
216       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
217         guint seqnum;
218         RTXData data;
219
220         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
221           seqnum = -1;
222
223         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
224
225         g_mutex_lock (&rtx->lock);
226         data.rtx = rtx;
227         data.seqnum = seqnum;
228         data.found = FALSE;
229         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
230         g_mutex_unlock (&rtx->lock);
231
232         gst_event_unref (event);
233         res = TRUE;
234       } else {
235         res = gst_pad_event_default (pad, parent, event);
236       }
237       break;
238     }
239     default:
240       res = gst_pad_event_default (pad, parent, event);
241       break;
242   }
243   return res;
244 }
245
246 static void
247 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
248 {
249   gst_pad_push (rtx->srcpad, buffer);
250 }
251
252 /* Must be called with rtx->lock */
253 static void
254 shrink_queue (GstRTPRtxQueue * rtx)
255 {
256   if (rtx->max_size_packets) {
257     while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
258       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
259   }
260 }
261
262 static GstFlowReturn
263 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
264 {
265   GstRTPRtxQueue *rtx;
266   GstFlowReturn ret;
267   GList *pending;
268
269   rtx = GST_RTP_RTX_QUEUE (parent);
270
271   g_mutex_lock (&rtx->lock);
272   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
273   shrink_queue (rtx);
274
275   pending = rtx->pending;
276   rtx->pending = NULL;
277   g_mutex_unlock (&rtx->lock);
278
279   pending = g_list_reverse (pending);
280   g_list_foreach (pending, (GFunc) do_push, rtx);
281   g_list_free (pending);
282
283   ret = gst_pad_push (rtx->srcpad, buffer);
284
285   return ret;
286 }
287
288 static gboolean
289 push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
290 {
291   GQueue *queue = user_data;
292
293   g_queue_push_head (queue, gst_buffer_ref (*buffer));
294
295   return TRUE;
296 }
297
298 static GstFlowReturn
299 gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
300     GstBufferList * list)
301 {
302   GstRTPRtxQueue *rtx;
303   GstFlowReturn ret;
304   GList *pending;
305
306   rtx = GST_RTP_RTX_QUEUE (parent);
307
308   g_mutex_lock (&rtx->lock);
309   gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
310   shrink_queue (rtx);
311
312   pending = rtx->pending;
313   rtx->pending = NULL;
314   g_mutex_unlock (&rtx->lock);
315
316   pending = g_list_reverse (pending);
317   g_list_foreach (pending, (GFunc) do_push, rtx);
318   g_list_free (pending);
319
320   ret = gst_pad_push_list (rtx->srcpad, list);
321
322   return ret;
323 }
324
325 static void
326 gst_rtp_rtx_queue_get_property (GObject * object,
327     guint prop_id, GValue * value, GParamSpec * pspec)
328 {
329   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
330
331   switch (prop_id) {
332     case PROP_MAX_SIZE_TIME:
333       g_value_set_uint (value, rtx->max_size_time);
334       break;
335     case PROP_MAX_SIZE_PACKETS:
336       g_value_set_uint (value, rtx->max_size_packets);
337       break;
338     default:
339       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340       break;
341   }
342 }
343
344 static void
345 gst_rtp_rtx_queue_set_property (GObject * object,
346     guint prop_id, const GValue * value, GParamSpec * pspec)
347 {
348   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
349
350   switch (prop_id) {
351     case PROP_MAX_SIZE_TIME:
352       rtx->max_size_time = g_value_get_uint (value);
353       break;
354     case PROP_MAX_SIZE_PACKETS:
355       rtx->max_size_packets = g_value_get_uint (value);
356       break;
357     default:
358       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
359       break;
360   }
361 }
362
363 static GstStateChangeReturn
364 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
365 {
366   GstStateChangeReturn ret;
367   GstRTPRtxQueue *rtx;
368
369   rtx = GST_RTP_RTX_QUEUE (element);
370
371   switch (transition) {
372     default:
373       break;
374   }
375
376   ret =
377       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
378       transition);
379
380   switch (transition) {
381     case GST_STATE_CHANGE_PAUSED_TO_READY:
382       gst_rtp_rtx_queue_reset (rtx, TRUE);
383       break;
384     default:
385       break;
386   }
387
388   return ret;
389 }
390
391 gboolean
392 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
393 {
394   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
395       "rtp retransmission queue");
396
397   return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
398       GST_TYPE_RTP_RTX_QUEUE);
399 }