rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxqueue.c
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-rtprtxqueue
25  * @title: rtprtxqueue
26  *
27  * rtprtxqueue maintains a queue of transmitted RTP packets, up to a
28  * configurable limit (see #GstRTPRtxQueue:max-size-time,
29  * #GstRTPRtxQueue:max-size-packets), and retransmits them upon request
30  * from the downstream rtpsession (GstRTPRetransmissionRequest event).
31  *
32  * This element is similar to rtprtxsend, but it has differences:
33  * - Retransmission from rtprtxqueue is not RFC 4588 compliant. The
34  * retransmitted packets have the same ssrc and payload type as the original
35  * stream.
36  * - As a side-effect of the above, rtprtxqueue does not require the use of
37  * rtprtxreceive on the receiving end. rtpjitterbuffer alone is able to
38  * reconstruct the stream.
39  * - Retransmission from rtprtxqueue happens as soon as the next regular flow
40  * packet is chained, while rtprtxsend retransmits as soon as the retransmission
41  * event is received, using a helper thread.
42  * - rtprtxqueue can be used with rtpbin without the need of hooking to its
43  * #GstRtpBin::request-aux-sender signal, which means it can be used with
44  * rtpbin using gst-launch.
45  *
46  * See also #GstRtpRtxSend, #GstRtpRtxReceive
47  *
48  * # Example pipelines
49  *
50  * |[
51  * gst-launch-1.0 rtpbin name=b rtp-profile=avpf \
52  *    audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! rtprtxqueue ! b.send_rtp_sink_0 \
53  *    b.send_rtp_src_0 ! identity drop-probability=0.01 ! udpsink host="127.0.0.1" port=5000 \
54  *    udpsrc port=5001 ! b.recv_rtcp_sink_0 \
55  *    b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5002 sync=false async=false
56  * ]|
57  * Sender pipeline
58  *
59  * |[
60  * gst-launch-1.0 rtpbin name=b rtp-profile=avpf do-retransmission=true \
61  *    udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
62  *        b.recv_rtp_sink_0 \
63  *    b. ! rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
64  *    udpsrc port=5002 ! b.recv_rtcp_sink_0 \
65  *    b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5001 sync=false async=false
66  * ]|
67  * Receiver pipeline
68  */
69
70 #ifdef HAVE_CONFIG_H
71 #include "config.h"
72 #endif
73
74 #include <gst/gst.h>
75 #include <gst/rtp/gstrtpbuffer.h>
76 #include <string.h>
77
78 #include "gstrtprtxqueue.h"
79
80 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
81 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
82
83 #define DEFAULT_MAX_SIZE_TIME    0
84 #define DEFAULT_MAX_SIZE_PACKETS 100
85
86 enum
87 {
88   PROP_0,
89   PROP_MAX_SIZE_TIME,
90   PROP_MAX_SIZE_PACKETS,
91   PROP_REQUESTS,
92   PROP_FULFILLED_REQUESTS,
93 };
94
95 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
96     GST_PAD_SRC,
97     GST_PAD_ALWAYS,
98     GST_STATIC_CAPS ("application/x-rtp")
99     );
100
101 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
102     GST_PAD_SINK,
103     GST_PAD_ALWAYS,
104     GST_STATIC_CAPS ("application/x-rtp")
105     );
106
107 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
108     GstEvent * event);
109 static gboolean gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
110     GstEvent * event);
111 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
112     GstBuffer * buffer);
113 static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
114     GstObject * parent, GstBufferList * list);
115
116 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
117     element, GstStateChange transition);
118
119 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
120     const GValue * value, GParamSpec * pspec);
121 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
122     GValue * value, GParamSpec * pspec);
123 static void gst_rtp_rtx_queue_finalize (GObject * object);
124
125 G_DEFINE_TYPE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT);
126
127 static void
128 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
129 {
130   GObjectClass *gobject_class;
131   GstElementClass *gstelement_class;
132
133   gobject_class = (GObjectClass *) klass;
134   gstelement_class = (GstElementClass *) klass;
135
136   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
137   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
138   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
139
140   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
141       g_param_spec_uint ("max-size-time", "Max Size Times",
142           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
143           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144
145   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
146       g_param_spec_uint ("max-size-packets", "Max Size Packets",
147           "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
148           DEFAULT_MAX_SIZE_PACKETS,
149           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150
151   g_object_class_install_property (gobject_class, PROP_REQUESTS,
152       g_param_spec_uint ("requests", "Requests",
153           "Total number of retransmission requests", 0, G_MAXUINT,
154           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
155
156   g_object_class_install_property (gobject_class, PROP_FULFILLED_REQUESTS,
157       g_param_spec_uint ("fulfilled-requests", "Fulfilled Requests",
158           "Number of fulfilled retransmission requests", 0, G_MAXUINT,
159           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
160
161   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
162   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
163
164   gst_element_class_set_static_metadata (gstelement_class,
165       "RTP Retransmission Queue", "Codec",
166       "Keep RTP packets in a queue for retransmission",
167       "Wim Taymans <wim.taymans@gmail.com>");
168
169   gstelement_class->change_state =
170       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
171 }
172
173 static void
174 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
175 {
176   g_mutex_lock (&rtx->lock);
177   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
178   g_queue_clear (rtx->queue);
179   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
180   g_list_free (rtx->pending);
181   rtx->pending = NULL;
182   rtx->n_requests = 0;
183   rtx->n_fulfilled_requests = 0;
184   g_mutex_unlock (&rtx->lock);
185 }
186
187 static void
188 gst_rtp_rtx_queue_finalize (GObject * object)
189 {
190   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
191
192   gst_rtp_rtx_queue_reset (rtx, TRUE);
193   g_queue_free (rtx->queue);
194   g_mutex_clear (&rtx->lock);
195
196   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
197 }
198
199 static void
200 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
201 {
202   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
203
204   rtx->srcpad =
205       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
206           "src"), "src");
207   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
208   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
209   gst_pad_set_event_function (rtx->srcpad,
210       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
211   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
212
213   rtx->sinkpad =
214       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
215           "sink"), "sink");
216   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
217   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
218   gst_pad_set_event_function (rtx->sinkpad,
219       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_sink_event));
220   gst_pad_set_chain_function (rtx->sinkpad,
221       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
222   gst_pad_set_chain_list_function (rtx->sinkpad,
223       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
224   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
225
226   rtx->queue = g_queue_new ();
227   g_mutex_init (&rtx->lock);
228
229   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
230   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
231 }
232
233 typedef struct
234 {
235   GstRTPRtxQueue *rtx;
236   guint seqnum;
237   gboolean found;
238 } RTXData;
239
240 static void
241 push_seqnum (GstBuffer * buffer, RTXData * data)
242 {
243   GstRTPRtxQueue *rtx = data->rtx;
244   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
245   guint16 seqnum;
246
247   if (data->found)
248     return;
249
250   if (!GST_IS_BUFFER (buffer) ||
251       !gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
252     return;
253
254   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
255   gst_rtp_buffer_unmap (&rtpbuffer);
256
257   if (seqnum == data->seqnum) {
258     data->found = TRUE;
259     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
260     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
261   }
262 }
263
264 static gboolean
265 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
266 {
267   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
268   gboolean res;
269
270   switch (GST_EVENT_TYPE (event)) {
271     case GST_EVENT_CUSTOM_UPSTREAM:
272     {
273       const GstStructure *s;
274
275       s = gst_event_get_structure (event);
276       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
277         guint seqnum;
278         RTXData data;
279
280         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
281           seqnum = -1;
282
283         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
284
285         g_mutex_lock (&rtx->lock);
286         data.rtx = rtx;
287         data.seqnum = seqnum;
288         data.found = FALSE;
289         rtx->n_requests += 1;
290         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
291         g_mutex_unlock (&rtx->lock);
292
293         gst_event_unref (event);
294         res = TRUE;
295       } else {
296         res = gst_pad_event_default (pad, parent, event);
297       }
298       break;
299     }
300     default:
301       res = gst_pad_event_default (pad, parent, event);
302       break;
303   }
304   return res;
305 }
306
307 static gboolean
308 gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
309     GstEvent * event)
310 {
311   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
312   gboolean res;
313
314   switch (GST_EVENT_TYPE (event)) {
315     case GST_EVENT_SEGMENT:
316     {
317       g_mutex_lock (&rtx->lock);
318       gst_event_copy_segment (event, &rtx->head_segment);
319       g_queue_push_head (rtx->queue, gst_event_ref (event));
320       g_mutex_unlock (&rtx->lock);
321       /* fall through */
322     }
323     default:
324       res = gst_pad_event_default (pad, parent, event);
325       break;
326   }
327   return res;
328 }
329
330 static void
331 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
332 {
333   rtx->n_fulfilled_requests += 1;
334   gst_pad_push (rtx->srcpad, buffer);
335 }
336
337 static guint32
338 get_ts_diff (GstRTPRtxQueue * rtx)
339 {
340   GstClockTime high_ts, low_ts;
341   GstClockTimeDiff result;
342   GstBuffer *high_buf, *low_buf;
343
344   high_buf = g_queue_peek_head (rtx->queue);
345
346   while (GST_IS_EVENT ((low_buf = g_queue_peek_tail (rtx->queue)))) {
347     GstEvent *event = g_queue_pop_tail (rtx->queue);
348     gst_event_copy_segment (event, &rtx->tail_segment);
349     gst_event_unref (event);
350   }
351
352   if (!high_buf || !low_buf || high_buf == low_buf)
353     return 0;
354
355   high_ts = GST_BUFFER_TIMESTAMP (high_buf);
356   low_ts = GST_BUFFER_TIMESTAMP (low_buf);
357
358   high_ts = gst_segment_to_running_time (&rtx->head_segment, GST_FORMAT_TIME,
359       high_ts);
360   low_ts = gst_segment_to_running_time (&rtx->tail_segment, GST_FORMAT_TIME,
361       low_ts);
362
363   result = high_ts - low_ts;
364
365   /* return value in ms instead of ns */
366   return (guint32) gst_util_uint64_scale_int (result, 1, GST_MSECOND);
367 }
368
369 /* Must be called with rtx->lock */
370 static void
371 shrink_queue (GstRTPRtxQueue * rtx)
372 {
373   if (rtx->max_size_packets) {
374     while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
375       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
376   }
377   if (rtx->max_size_time) {
378     while (get_ts_diff (rtx) > rtx->max_size_time)
379       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
380   }
381 }
382
383 static GstFlowReturn
384 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
385 {
386   GstRTPRtxQueue *rtx;
387   GstFlowReturn ret;
388   GList *pending;
389
390   rtx = GST_RTP_RTX_QUEUE (parent);
391
392   g_mutex_lock (&rtx->lock);
393   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
394   shrink_queue (rtx);
395
396   pending = rtx->pending;
397   rtx->pending = NULL;
398   g_mutex_unlock (&rtx->lock);
399
400   pending = g_list_reverse (pending);
401   g_list_foreach (pending, (GFunc) do_push, rtx);
402   g_list_free (pending);
403
404   ret = gst_pad_push (rtx->srcpad, buffer);
405
406   return ret;
407 }
408
409 static gboolean
410 push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
411 {
412   GQueue *queue = user_data;
413
414   g_queue_push_head (queue, gst_buffer_ref (*buffer));
415
416   return TRUE;
417 }
418
419 static GstFlowReturn
420 gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
421     GstBufferList * list)
422 {
423   GstRTPRtxQueue *rtx;
424   GstFlowReturn ret;
425   GList *pending;
426
427   rtx = GST_RTP_RTX_QUEUE (parent);
428
429   g_mutex_lock (&rtx->lock);
430   gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
431   shrink_queue (rtx);
432
433   pending = rtx->pending;
434   rtx->pending = NULL;
435   g_mutex_unlock (&rtx->lock);
436
437   pending = g_list_reverse (pending);
438   g_list_foreach (pending, (GFunc) do_push, rtx);
439   g_list_free (pending);
440
441   ret = gst_pad_push_list (rtx->srcpad, list);
442
443   return ret;
444 }
445
446 static void
447 gst_rtp_rtx_queue_get_property (GObject * object,
448     guint prop_id, GValue * value, GParamSpec * pspec)
449 {
450   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
451
452   switch (prop_id) {
453     case PROP_MAX_SIZE_TIME:
454       g_value_set_uint (value, rtx->max_size_time);
455       break;
456     case PROP_MAX_SIZE_PACKETS:
457       g_value_set_uint (value, rtx->max_size_packets);
458       break;
459     case PROP_REQUESTS:
460       g_value_set_uint (value, rtx->n_requests);
461       break;
462     case PROP_FULFILLED_REQUESTS:
463       g_value_set_uint (value, rtx->n_fulfilled_requests);
464       break;
465     default:
466       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
467       break;
468   }
469 }
470
471 static void
472 gst_rtp_rtx_queue_set_property (GObject * object,
473     guint prop_id, const GValue * value, GParamSpec * pspec)
474 {
475   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
476
477   switch (prop_id) {
478     case PROP_MAX_SIZE_TIME:
479       rtx->max_size_time = g_value_get_uint (value);
480       break;
481     case PROP_MAX_SIZE_PACKETS:
482       rtx->max_size_packets = g_value_get_uint (value);
483       break;
484     default:
485       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
486       break;
487   }
488 }
489
490 static GstStateChangeReturn
491 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
492 {
493   GstStateChangeReturn ret;
494   GstRTPRtxQueue *rtx;
495
496   rtx = GST_RTP_RTX_QUEUE (element);
497
498   switch (transition) {
499     default:
500       break;
501   }
502
503   ret =
504       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
505       transition);
506
507   switch (transition) {
508     case GST_STATE_CHANGE_PAUSED_TO_READY:
509       gst_rtp_rtx_queue_reset (rtx, TRUE);
510       break;
511     default:
512       break;
513   }
514
515   return ret;
516 }
517
518 gboolean
519 gst_rtp_rtx_queue_plugin_init (GstPlugin * plugin)
520 {
521   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
522       "rtp retransmission queue");
523
524   return gst_element_register (plugin, "rtprtxqueue", GST_RANK_NONE,
525       GST_TYPE_RTP_RTX_QUEUE);
526 }