23fcc2da55740e4df6843a5581b8a7fb4b6aa038
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxqueue.c
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-rtprtxqueue
25  */
26
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30
31 #include <gst/gst.h>
32 #include <gst/rtp/gstrtpbuffer.h>
33 #include <string.h>
34
35 #include "gstrtprtxqueue.h"
36
37 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
38 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
39
40 enum
41 {
42   PROP_0,
43   PROP_LAST
44 };
45
46 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
47     GST_PAD_SRC,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS ("application/x-rtp")
50     );
51
52 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
53     GST_PAD_SINK,
54     GST_PAD_ALWAYS,
55     GST_STATIC_CAPS ("application/x-rtp")
56     );
57
58 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
59     GstEvent * event);
60 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
61     GstBuffer * buffer);
62
63 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
64     element, GstStateChange transition);
65
66 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
67     const GValue * value, GParamSpec * pspec);
68 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
69     GValue * value, GParamSpec * pspec);
70 static void gst_rtp_rtx_queue_finalize (GObject * object);
71
72 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
73
74 static void
75 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
76 {
77   GObjectClass *gobject_class;
78   GstElementClass *gstelement_class;
79
80   gobject_class = (GObjectClass *) klass;
81   gstelement_class = (GstElementClass *) klass;
82
83   gst_element_class_add_pad_template (gstelement_class,
84       gst_static_pad_template_get (&src_factory));
85   gst_element_class_add_pad_template (gstelement_class,
86       gst_static_pad_template_get (&sink_factory));
87
88   gst_element_class_set_static_metadata (gstelement_class,
89       "RTP Retransmission Queue", "Codec",
90       "Keep RTP packets in a queue for retransmission",
91       "Wim Taymans <wim.taymans@gmail.com>");
92
93   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
94   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
95   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
96
97   gstelement_class->change_state =
98       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
99 }
100
101 static void
102 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
103 {
104   g_mutex_lock (&rtx->lock);
105   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
106   g_queue_clear (rtx->queue);
107   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
108   g_list_free (rtx->pending);
109   rtx->pending = NULL;
110   g_mutex_unlock (&rtx->lock);
111 }
112
113 static void
114 gst_rtp_rtx_queue_finalize (GObject * object)
115 {
116   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
117
118   gst_rtp_rtx_queue_reset (rtx, TRUE);
119   g_queue_free (rtx->queue);
120   g_mutex_clear (&rtx->lock);
121
122   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
123 }
124
125 static void
126 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
127 {
128   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
129
130   rtx->srcpad =
131       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
132           "src"), "src");
133   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
134   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
135   gst_pad_set_event_function (rtx->srcpad,
136       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
137   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
138
139   rtx->sinkpad =
140       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
141           "sink"), "sink");
142   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
143   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
144   gst_pad_set_chain_function (rtx->sinkpad,
145       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
146   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
147
148   rtx->queue = g_queue_new ();
149   g_mutex_init (&rtx->lock);
150 }
151
152 typedef struct
153 {
154   GstRTPRtxQueue *rtx;
155   guint seqnum;
156   gboolean found;
157 } RTXData;
158
159 static void
160 push_seqnum (GstBuffer * buffer, RTXData * data)
161 {
162   GstRTPRtxQueue *rtx = data->rtx;
163   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
164   guint16 seqnum;
165
166   if (data->found)
167     return;
168
169   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
170     return;
171
172   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
173   gst_rtp_buffer_unmap (&rtpbuffer);
174
175   if (seqnum == data->seqnum) {
176     data->found = TRUE;
177     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
178     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
179   }
180 }
181
182 static gboolean
183 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
184 {
185   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
186   gboolean res;
187
188   switch (GST_EVENT_TYPE (event)) {
189     case GST_EVENT_CUSTOM_UPSTREAM:
190     {
191       const GstStructure *s;
192
193       s = gst_event_get_structure (event);
194       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
195         guint seqnum;
196         RTXData data;
197
198         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
199           seqnum = -1;
200
201         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
202
203         g_mutex_lock (&rtx->lock);
204         data.rtx = rtx;
205         data.seqnum = seqnum;
206         data.found = FALSE;
207         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
208         g_mutex_unlock (&rtx->lock);
209
210         gst_event_unref (event);
211         res = TRUE;
212       } else {
213         res = gst_pad_event_default (pad, parent, event);
214       }
215       break;
216     }
217     default:
218       res = gst_pad_event_default (pad, parent, event);
219       break;
220   }
221   return res;
222 }
223
224 static void
225 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
226 {
227   gst_pad_push (rtx->srcpad, buffer);
228 }
229
230 static GstFlowReturn
231 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
232 {
233   GstRTPRtxQueue *rtx;
234   GstFlowReturn ret;
235   GList *pending;
236
237   rtx = GST_RTP_RTX_QUEUE (parent);
238
239   g_mutex_lock (&rtx->lock);
240   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
241   while (g_queue_get_length (rtx->queue) > 100) {
242     gst_buffer_unref (g_queue_pop_tail (rtx->queue));
243   }
244   pending = rtx->pending;
245   rtx->pending = NULL;
246   g_mutex_unlock (&rtx->lock);
247
248   g_list_foreach (pending, (GFunc) do_push, rtx);
249   g_list_free (pending);
250
251   ret = gst_pad_push (rtx->srcpad, buffer);
252
253   return ret;
254 }
255
256 static void
257 gst_rtp_rtx_queue_get_property (GObject * object,
258     guint prop_id, GValue * value, GParamSpec * pspec)
259 {
260   switch (prop_id) {
261     default:
262       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
263       break;
264   }
265 }
266
267 static void
268 gst_rtp_rtx_queue_set_property (GObject * object,
269     guint prop_id, const GValue * value, GParamSpec * pspec)
270 {
271   switch (prop_id) {
272     default:
273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
274       break;
275   }
276 }
277
278 static GstStateChangeReturn
279 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
280 {
281   GstStateChangeReturn ret;
282   GstRTPRtxQueue *rtx;
283
284   rtx = GST_RTP_RTX_QUEUE (element);
285
286   switch (transition) {
287     default:
288       break;
289   }
290
291   ret =
292       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
293       transition);
294
295   switch (transition) {
296     case GST_STATE_CHANGE_PAUSED_TO_READY:
297       gst_rtp_rtx_queue_reset (rtx, TRUE);
298       break;
299     default:
300       break;
301   }
302
303   return ret;
304 }
305
306 gboolean
307 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
308 {
309   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
310       "rtp retransmission queue");
311
312   return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
313       GST_TYPE_RTP_RTX_QUEUE);
314 }