rtxqueue: Expose basic statistics as properties.
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxqueue.c
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-rtprtxqueue
25  */
26
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30
31 #include <gst/gst.h>
32 #include <gst/rtp/gstrtpbuffer.h>
33 #include <string.h>
34
35 #include "gstrtprtxqueue.h"
36
37 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
38 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
39
40 #define DEFAULT_MAX_SIZE_TIME    0
41 #define DEFAULT_MAX_SIZE_PACKETS 100
42
43 enum
44 {
45   PROP_0,
46   PROP_MAX_SIZE_TIME,
47   PROP_MAX_SIZE_PACKETS,
48   PROP_REQUESTS,
49   PROP_FULFILLED_REQUESTS,
50 };
51
52 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
53     GST_PAD_SRC,
54     GST_PAD_ALWAYS,
55     GST_STATIC_CAPS ("application/x-rtp")
56     );
57
58 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
59     GST_PAD_SINK,
60     GST_PAD_ALWAYS,
61     GST_STATIC_CAPS ("application/x-rtp")
62     );
63
64 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
65     GstEvent * event);
66 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
67     GstBuffer * buffer);
68 static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
69     GstObject * parent, GstBufferList * list);
70
71 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
72     element, GstStateChange transition);
73
74 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
75     const GValue * value, GParamSpec * pspec);
76 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
77     GValue * value, GParamSpec * pspec);
78 static void gst_rtp_rtx_queue_finalize (GObject * object);
79
80 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
81
82 static void
83 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
84 {
85   GObjectClass *gobject_class;
86   GstElementClass *gstelement_class;
87
88   gobject_class = (GObjectClass *) klass;
89   gstelement_class = (GstElementClass *) klass;
90
91   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
92   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
93   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
94
95   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
96       g_param_spec_uint ("max-size-time", "Max Size Times",
97           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
98           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
99
100   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
101       g_param_spec_uint ("max-size-packets", "Max Size Packets",
102           "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
103           DEFAULT_MAX_SIZE_PACKETS,
104           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105
106   g_object_class_install_property (gobject_class, PROP_REQUESTS,
107       g_param_spec_uint ("requests", "Requests",
108           "Total number of retransmission requests", 0, G_MAXUINT,
109           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
110
111   g_object_class_install_property (gobject_class, PROP_FULFILLED_REQUESTS,
112       g_param_spec_uint ("fulfilled-requests", "Fulfilled Requests",
113           "Number of fulfilled retransmission requests", 0, G_MAXUINT,
114           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
115
116   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
117   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
118
119   gst_element_class_set_static_metadata (gstelement_class,
120       "RTP Retransmission Queue", "Codec",
121       "Keep RTP packets in a queue for retransmission",
122       "Wim Taymans <wim.taymans@gmail.com>");
123
124   gstelement_class->change_state =
125       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
126 }
127
128 static void
129 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
130 {
131   g_mutex_lock (&rtx->lock);
132   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
133   g_queue_clear (rtx->queue);
134   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
135   g_list_free (rtx->pending);
136   rtx->pending = NULL;
137   rtx->n_requests = 0;
138   rtx->n_fulfilled_requests = 0;
139   g_mutex_unlock (&rtx->lock);
140 }
141
142 static void
143 gst_rtp_rtx_queue_finalize (GObject * object)
144 {
145   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
146
147   gst_rtp_rtx_queue_reset (rtx, TRUE);
148   g_queue_free (rtx->queue);
149   g_mutex_clear (&rtx->lock);
150
151   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
152 }
153
154 static void
155 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
156 {
157   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
158
159   rtx->srcpad =
160       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
161           "src"), "src");
162   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
163   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
164   gst_pad_set_event_function (rtx->srcpad,
165       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
166   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
167
168   rtx->sinkpad =
169       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
170           "sink"), "sink");
171   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
172   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
173   gst_pad_set_chain_function (rtx->sinkpad,
174       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
175   gst_pad_set_chain_list_function (rtx->sinkpad,
176       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
177   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
178
179   rtx->queue = g_queue_new ();
180   g_mutex_init (&rtx->lock);
181
182   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
183   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
184 }
185
186 typedef struct
187 {
188   GstRTPRtxQueue *rtx;
189   guint seqnum;
190   gboolean found;
191 } RTXData;
192
193 static void
194 push_seqnum (GstBuffer * buffer, RTXData * data)
195 {
196   GstRTPRtxQueue *rtx = data->rtx;
197   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
198   guint16 seqnum;
199
200   if (data->found)
201     return;
202
203   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
204     return;
205
206   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
207   gst_rtp_buffer_unmap (&rtpbuffer);
208
209   if (seqnum == data->seqnum) {
210     data->found = TRUE;
211     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
212     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
213   }
214 }
215
216 static gboolean
217 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
218 {
219   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
220   gboolean res;
221
222   switch (GST_EVENT_TYPE (event)) {
223     case GST_EVENT_CUSTOM_UPSTREAM:
224     {
225       const GstStructure *s;
226
227       s = gst_event_get_structure (event);
228       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
229         guint seqnum;
230         RTXData data;
231
232         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
233           seqnum = -1;
234
235         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
236
237         g_mutex_lock (&rtx->lock);
238         data.rtx = rtx;
239         data.seqnum = seqnum;
240         data.found = FALSE;
241         rtx->n_requests += 1;
242         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
243         g_mutex_unlock (&rtx->lock);
244
245         gst_event_unref (event);
246         res = TRUE;
247       } else {
248         res = gst_pad_event_default (pad, parent, event);
249       }
250       break;
251     }
252     default:
253       res = gst_pad_event_default (pad, parent, event);
254       break;
255   }
256   return res;
257 }
258
259 static void
260 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
261 {
262   rtx->n_fulfilled_requests += 1;
263   gst_pad_push (rtx->srcpad, buffer);
264 }
265
266 /* Must be called with rtx->lock */
267 static void
268 shrink_queue (GstRTPRtxQueue * rtx)
269 {
270   if (rtx->max_size_packets) {
271     while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
272       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
273   }
274 }
275
276 static GstFlowReturn
277 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
278 {
279   GstRTPRtxQueue *rtx;
280   GstFlowReturn ret;
281   GList *pending;
282
283   rtx = GST_RTP_RTX_QUEUE (parent);
284
285   g_mutex_lock (&rtx->lock);
286   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
287   shrink_queue (rtx);
288
289   pending = rtx->pending;
290   rtx->pending = NULL;
291   g_mutex_unlock (&rtx->lock);
292
293   pending = g_list_reverse (pending);
294   g_list_foreach (pending, (GFunc) do_push, rtx);
295   g_list_free (pending);
296
297   ret = gst_pad_push (rtx->srcpad, buffer);
298
299   return ret;
300 }
301
302 static gboolean
303 push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
304 {
305   GQueue *queue = user_data;
306
307   g_queue_push_head (queue, gst_buffer_ref (*buffer));
308
309   return TRUE;
310 }
311
312 static GstFlowReturn
313 gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
314     GstBufferList * list)
315 {
316   GstRTPRtxQueue *rtx;
317   GstFlowReturn ret;
318   GList *pending;
319
320   rtx = GST_RTP_RTX_QUEUE (parent);
321
322   g_mutex_lock (&rtx->lock);
323   gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
324   shrink_queue (rtx);
325
326   pending = rtx->pending;
327   rtx->pending = NULL;
328   g_mutex_unlock (&rtx->lock);
329
330   pending = g_list_reverse (pending);
331   g_list_foreach (pending, (GFunc) do_push, rtx);
332   g_list_free (pending);
333
334   ret = gst_pad_push_list (rtx->srcpad, list);
335
336   return ret;
337 }
338
339 static void
340 gst_rtp_rtx_queue_get_property (GObject * object,
341     guint prop_id, GValue * value, GParamSpec * pspec)
342 {
343   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
344
345   switch (prop_id) {
346     case PROP_MAX_SIZE_TIME:
347       g_value_set_uint (value, rtx->max_size_time);
348       break;
349     case PROP_MAX_SIZE_PACKETS:
350       g_value_set_uint (value, rtx->max_size_packets);
351       break;
352     case PROP_REQUESTS:
353       g_value_set_uint (value, rtx->n_requests);
354       break;
355     case PROP_FULFILLED_REQUESTS:
356       g_value_set_uint (value, rtx->n_fulfilled_requests);
357       break;
358     default:
359       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
360       break;
361   }
362 }
363
364 static void
365 gst_rtp_rtx_queue_set_property (GObject * object,
366     guint prop_id, const GValue * value, GParamSpec * pspec)
367 {
368   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
369
370   switch (prop_id) {
371     case PROP_MAX_SIZE_TIME:
372       rtx->max_size_time = g_value_get_uint (value);
373       break;
374     case PROP_MAX_SIZE_PACKETS:
375       rtx->max_size_packets = g_value_get_uint (value);
376       break;
377     default:
378       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
379       break;
380   }
381 }
382
383 static GstStateChangeReturn
384 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
385 {
386   GstStateChangeReturn ret;
387   GstRTPRtxQueue *rtx;
388
389   rtx = GST_RTP_RTX_QUEUE (element);
390
391   switch (transition) {
392     default:
393       break;
394   }
395
396   ret =
397       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
398       transition);
399
400   switch (transition) {
401     case GST_STATE_CHANGE_PAUSED_TO_READY:
402       gst_rtp_rtx_queue_reset (rtx, TRUE);
403       break;
404     default:
405       break;
406   }
407
408   return ret;
409 }
410
411 gboolean
412 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
413 {
414   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
415       "rtp retransmission queue");
416
417   return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
418       GST_TYPE_RTP_RTX_QUEUE);
419 }