rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxsend.c
1 /* RTP Retransmission sender element for GStreamer
2  *
3  * gstrtprtxsend.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-rtprtxsend
26  * @title: rtprtxsend
27  *
28  * See #GstRtpRtxReceive for examples
29  *
30  * The purpose of the sender RTX object is to keep a history of RTP packets up
31  * to a configurable limit (max-size-time or max-size-packets). It will listen
32  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
33  * comes from downstream (#GstRtpSession). When receiving a request it will
34  * look up the requested seqnum in its list of stored packets. If the packet
35  * is available, it will create a RTX packet according to RFC 4588 and send
36  * this as an auxiliary stream. RTX is SSRC-multiplexed
37  */
38
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42
43 #include <gst/gst.h>
44 #include <gst/rtp/gstrtpbuffer.h>
45 #include <string.h>
46 #include <stdlib.h>
47
48 #include "gstrtprtxsend.h"
49
50 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
51 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
52
53 #define DEFAULT_RTX_PAYLOAD_TYPE 0
54 #define DEFAULT_MAX_SIZE_TIME    0
55 #define DEFAULT_MAX_SIZE_PACKETS 100
56
57 enum
58 {
59   PROP_0,
60   PROP_SSRC_MAP,
61   PROP_PAYLOAD_TYPE_MAP,
62   PROP_MAX_SIZE_TIME,
63   PROP_MAX_SIZE_PACKETS,
64   PROP_NUM_RTX_REQUESTS,
65   PROP_NUM_RTX_PACKETS,
66   PROP_CLOCK_RATE_MAP,
67 };
68
69 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
70     GST_PAD_SRC,
71     GST_PAD_ALWAYS,
72     GST_STATIC_CAPS ("application/x-rtp")
73     );
74
75 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
76     GST_PAD_SINK,
77     GST_PAD_ALWAYS,
78     GST_STATIC_CAPS ("application/x-rtp")
79     );
80
81 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
82     guint visible, guint bytes, guint64 time, gpointer checkdata);
83
84 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
85     GstEvent * event);
86 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
87     GstEvent * event);
88 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
89     GstBuffer * buffer);
90 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
91     GstObject * parent, GstBufferList * list);
92
93 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
94 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
95     GstObject * parent, GstPadMode mode, gboolean active);
96
97 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
98     element, GstStateChange transition);
99
100 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
101     const GValue * value, GParamSpec * pspec);
102 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
103     GValue * value, GParamSpec * pspec);
104 static void gst_rtp_rtx_send_finalize (GObject * object);
105
106 G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
107
108 typedef struct
109 {
110   guint16 seqnum;
111   guint32 timestamp;
112   GstBuffer *buffer;
113 } BufferQueueItem;
114
115 static void
116 buffer_queue_item_free (BufferQueueItem * item)
117 {
118   gst_buffer_unref (item->buffer);
119   g_slice_free (BufferQueueItem, item);
120 }
121
122 typedef struct
123 {
124   guint32 rtx_ssrc;
125   guint16 seqnum_base, next_seqnum;
126   gint clock_rate;
127
128   /* history of rtp packets */
129   GSequence *queue;
130 } SSRCRtxData;
131
132 static SSRCRtxData *
133 ssrc_rtx_data_new (guint32 rtx_ssrc)
134 {
135   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
136
137   data->rtx_ssrc = rtx_ssrc;
138   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
139   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
140
141   return data;
142 }
143
144 static void
145 ssrc_rtx_data_free (SSRCRtxData * data)
146 {
147   g_sequence_free (data->queue);
148   g_slice_free (SSRCRtxData, data);
149 }
150
151 static void
152 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
153 {
154   GObjectClass *gobject_class;
155   GstElementClass *gstelement_class;
156
157   gobject_class = (GObjectClass *) klass;
158   gstelement_class = (GstElementClass *) klass;
159
160   gobject_class->get_property = gst_rtp_rtx_send_get_property;
161   gobject_class->set_property = gst_rtp_rtx_send_set_property;
162   gobject_class->finalize = gst_rtp_rtx_send_finalize;
163
164   g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
165       g_param_spec_boxed ("ssrc-map", "SSRC Map",
166           "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
167           " (default = random)", GST_TYPE_STRUCTURE,
168           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
169
170   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
171       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
172           "Map of original payload types to their retransmission payload types",
173           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
174
175   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
176       g_param_spec_uint ("max-size-time", "Max Size Time",
177           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
178           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
179
180   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
181       g_param_spec_uint ("max-size-packets", "Max Size Packets",
182           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
183           DEFAULT_MAX_SIZE_PACKETS,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
187       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
188           "Number of retransmission events received", 0, G_MAXUINT,
189           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
190
191   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
192       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
193           " Number of retransmission packets sent", 0, G_MAXUINT,
194           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
195
196   g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
197       g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
198           "Map of payload types to their clock rates",
199           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
200
201   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
202   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
203
204   gst_element_class_set_static_metadata (gstelement_class,
205       "RTP Retransmission Sender", "Codec",
206       "Retransmit RTP packets when needed, according to RFC4588",
207       "Julien Isorce <julien.isorce@collabora.co.uk>");
208
209   gstelement_class->change_state =
210       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
211 }
212
213 static void
214 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
215 {
216   GST_OBJECT_LOCK (rtx);
217   gst_data_queue_flush (rtx->queue);
218   g_hash_table_remove_all (rtx->ssrc_data);
219   g_hash_table_remove_all (rtx->rtx_ssrcs);
220   rtx->num_rtx_requests = 0;
221   rtx->num_rtx_packets = 0;
222   GST_OBJECT_UNLOCK (rtx);
223 }
224
225 static void
226 gst_rtp_rtx_send_finalize (GObject * object)
227 {
228   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
229
230   g_hash_table_unref (rtx->ssrc_data);
231   g_hash_table_unref (rtx->rtx_ssrcs);
232   if (rtx->external_ssrc_map)
233     gst_structure_free (rtx->external_ssrc_map);
234   g_hash_table_unref (rtx->rtx_pt_map);
235   if (rtx->rtx_pt_map_structure)
236     gst_structure_free (rtx->rtx_pt_map_structure);
237   g_hash_table_unref (rtx->clock_rate_map);
238   if (rtx->clock_rate_map_structure)
239     gst_structure_free (rtx->clock_rate_map_structure);
240   g_object_unref (rtx->queue);
241
242   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
243 }
244
245 static void
246 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
247 {
248   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
249
250   rtx->srcpad =
251       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
252           "src"), "src");
253   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
254   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
255   gst_pad_set_event_function (rtx->srcpad,
256       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
257   gst_pad_set_activatemode_function (rtx->srcpad,
258       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
259   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
260
261   rtx->sinkpad =
262       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
263           "sink"), "sink");
264   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
265   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
266   gst_pad_set_event_function (rtx->sinkpad,
267       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
268   gst_pad_set_chain_function (rtx->sinkpad,
269       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
270   gst_pad_set_chain_list_function (rtx->sinkpad,
271       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
272   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
273
274   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
275       NULL, rtx);
276   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
277       NULL, (GDestroyNotify) ssrc_rtx_data_free);
278   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
279   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
280   rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
281
282   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
283   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
284 }
285
286 static void
287 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
288 {
289   GST_OBJECT_LOCK (rtx);
290   gst_data_queue_set_flushing (rtx->queue, flush);
291   gst_data_queue_flush (rtx->queue);
292   GST_OBJECT_UNLOCK (rtx);
293 }
294
295 static gboolean
296 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
297     guint visible, guint bytes, guint64 time, gpointer checkdata)
298 {
299   return FALSE;
300 }
301
302 static void
303 gst_rtp_rtx_data_queue_item_free (gpointer item)
304 {
305   GstDataQueueItem *data = item;
306   if (data->object)
307     gst_mini_object_unref (data->object);
308   g_slice_free (GstDataQueueItem, data);
309 }
310
311 static gboolean
312 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
313 {
314   GstDataQueueItem *data;
315   gboolean success;
316
317   data = g_slice_new0 (GstDataQueueItem);
318   data->object = GST_MINI_OBJECT (object);
319   data->size = 1;
320   data->duration = 1;
321   data->visible = TRUE;
322   data->destroy = gst_rtp_rtx_data_queue_item_free;
323
324   success = gst_data_queue_push (rtx->queue, data);
325
326   if (!success)
327     data->destroy (data);
328
329   return success;
330 }
331
332 static guint32
333 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
334     gboolean consider_choice)
335 {
336   guint32 ssrc = consider_choice ? choice : g_random_int ();
337
338   /* make sure to be different than any other */
339   while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
340       g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
341     ssrc = g_random_int ();
342   }
343
344   return ssrc;
345 }
346
347 static SSRCRtxData *
348 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
349 {
350   SSRCRtxData *data;
351   guint32 rtx_ssrc = 0;
352   gboolean consider = FALSE;
353
354   if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
355               GUINT_TO_POINTER (ssrc)))) {
356     if (rtx->external_ssrc_map) {
357       gchar *ssrc_str;
358       ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
359       consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
360           &rtx_ssrc);
361       g_free (ssrc_str);
362     }
363     rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
364     data = ssrc_rtx_data_new (rtx_ssrc);
365     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
366     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
367         GUINT_TO_POINTER (ssrc));
368   } else {
369     data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
370   }
371   return data;
372 }
373
374 /* Copy fixed header and extension. Add OSN before to copy payload
375  * Copy memory to avoid to manually copy each rtp buffer field.
376  */
377 static GstBuffer *
378 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
379 {
380   GstMemory *mem = NULL;
381   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
382   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
383   GstBuffer *new_buffer = gst_buffer_new ();
384   GstMapInfo map;
385   guint payload_len = 0;
386   SSRCRtxData *data;
387   guint32 ssrc;
388   guint16 seqnum;
389   guint8 fmtp;
390
391   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
392
393   /* get needed data from GstRtpRtxSend */
394   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
395   data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
396   ssrc = data->rtx_ssrc;
397   seqnum = data->next_seqnum++;
398   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
399           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
400
401   GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
402       "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
403       seqnum, ssrc);
404
405   /* gst_rtp_buffer_map does not map the payload so do it now */
406   gst_rtp_buffer_get_payload (&rtp);
407
408   /* copy fixed header */
409   mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
410   gst_buffer_append_memory (new_buffer, mem);
411
412   /* copy extension if any */
413   if (rtp.size[1]) {
414     mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
415     gst_memory_map (mem, &map, GST_MAP_WRITE);
416     memcpy (map.data, rtp.data[1], rtp.size[1]);
417     gst_memory_unmap (mem, &map);
418     gst_buffer_append_memory (new_buffer, mem);
419   }
420
421   /* copy payload and add OSN just before */
422   payload_len = 2 + rtp.size[2];
423   mem = gst_allocator_alloc (NULL, payload_len, NULL);
424
425   gst_memory_map (mem, &map, GST_MAP_WRITE);
426   GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
427   if (rtp.size[2])
428     memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
429   gst_memory_unmap (mem, &map);
430   gst_buffer_append_memory (new_buffer, mem);
431
432   /* everything needed is copied */
433   gst_rtp_buffer_unmap (&rtp);
434
435   /* set ssrc, seqnum and fmtp */
436   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
437   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
438   gst_rtp_buffer_set_seq (&new_rtp, seqnum);
439   gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
440   /* RFC 4588: let other elements do the padding, as normal */
441   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
442   gst_rtp_buffer_unmap (&new_rtp);
443
444   /* Copy over timestamps */
445   gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
446
447   return new_buffer;
448 }
449
450 static gint
451 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
452     gpointer user_data)
453 {
454   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
455    * it returns negative when seqnum1 > seqnum2 and we want negative
456    * when b > a, i.e. a is smaller, so it comes first in the sequence */
457   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
458 }
459
460 static gboolean
461 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
462 {
463   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
464   gboolean res;
465
466   switch (GST_EVENT_TYPE (event)) {
467     case GST_EVENT_CUSTOM_UPSTREAM:
468     {
469       const GstStructure *s = gst_event_get_structure (event);
470
471       /* This event usually comes from the downstream gstrtpsession */
472       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
473         guint seqnum = 0;
474         guint ssrc = 0;
475         GstBuffer *rtx_buf = NULL;
476
477         /* retrieve seqnum of the packet that need to be retransmitted */
478         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
479           seqnum = -1;
480
481         /* retrieve ssrc of the packet that need to be retransmitted */
482         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
483           ssrc = -1;
484
485         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
486             seqnum, ssrc);
487
488         GST_OBJECT_LOCK (rtx);
489         /* check if request is for us */
490         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
491           SSRCRtxData *data;
492           GSequenceIter *iter;
493           BufferQueueItem search_item;
494
495           /* update statistics */
496           ++rtx->num_rtx_requests;
497
498           data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
499
500           search_item.seqnum = seqnum;
501           iter = g_sequence_lookup (data->queue, &search_item,
502               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
503           if (iter) {
504             BufferQueueItem *item = g_sequence_get (iter);
505             GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
506             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
507           }
508 #ifndef GST_DISABLE_DEBUG
509           else {
510             BufferQueueItem *item = NULL;
511
512             iter = g_sequence_get_begin_iter (data->queue);
513             if (!g_sequence_iter_is_end (iter))
514               item = g_sequence_get (iter);
515
516             if (item && seqnum < item->seqnum) {
517               GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
518                   "removed from the rtx queue; the first available is %u",
519                   seqnum, item->seqnum);
520             } else {
521               GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
522                   "transmitted yet in the original stream; either the remote end "
523                   "is not configured correctly, or the source is too slow",
524                   seqnum);
525             }
526           }
527 #endif
528         }
529         GST_OBJECT_UNLOCK (rtx);
530
531         if (rtx_buf)
532           gst_rtp_rtx_send_push_out (rtx, rtx_buf);
533
534         gst_event_unref (event);
535         res = TRUE;
536
537         /* This event usually comes from the downstream gstrtpsession */
538       } else if (gst_structure_has_name (s, "GstRTPCollision")) {
539         guint ssrc = 0;
540
541         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
542           ssrc = -1;
543
544         GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
545
546         GST_OBJECT_LOCK (rtx);
547
548         /* choose another ssrc for our retransmitted stream */
549         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
550           guint master_ssrc;
551           SSRCRtxData *data;
552
553           master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
554                   GUINT_TO_POINTER (ssrc)));
555           data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
556
557           /* change rtx_ssrc and update the reverse map */
558           data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
559           g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
560           g_hash_table_insert (rtx->rtx_ssrcs,
561               GUINT_TO_POINTER (data->rtx_ssrc),
562               GUINT_TO_POINTER (master_ssrc));
563
564           GST_OBJECT_UNLOCK (rtx);
565
566           /* no need to forward to payloader because we make sure to have
567            * a different ssrc
568            */
569           gst_event_unref (event);
570           res = TRUE;
571         } else {
572           /* if master ssrc has collided, remove it from our data, as it
573            * is not going to be used any longer */
574           if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
575             SSRCRtxData *data;
576             data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
577             g_hash_table_remove (rtx->rtx_ssrcs,
578                 GUINT_TO_POINTER (data->rtx_ssrc));
579             g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
580           }
581
582           GST_OBJECT_UNLOCK (rtx);
583
584           /* forward event to payloader in case collided ssrc is
585            * master stream */
586           res = gst_pad_event_default (pad, parent, event);
587         }
588       } else {
589         res = gst_pad_event_default (pad, parent, event);
590       }
591       break;
592     }
593     default:
594       res = gst_pad_event_default (pad, parent, event);
595       break;
596   }
597   return res;
598 }
599
600 static gboolean
601 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
602 {
603   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
604
605   switch (GST_EVENT_TYPE (event)) {
606     case GST_EVENT_FLUSH_START:
607       gst_pad_push_event (rtx->srcpad, event);
608       gst_rtp_rtx_send_set_flushing (rtx, TRUE);
609       gst_pad_pause_task (rtx->srcpad);
610       return TRUE;
611     case GST_EVENT_FLUSH_STOP:
612       gst_pad_push_event (rtx->srcpad, event);
613       gst_rtp_rtx_send_set_flushing (rtx, FALSE);
614       gst_pad_start_task (rtx->srcpad,
615           (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
616       return TRUE;
617     case GST_EVENT_EOS:
618       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
619       gst_rtp_rtx_send_push_out (rtx, event);
620       return TRUE;
621     case GST_EVENT_CAPS:
622     {
623       GstCaps *caps;
624       GstStructure *s;
625       guint ssrc;
626       gint payload;
627       gpointer rtx_payload;
628       SSRCRtxData *data;
629
630       gst_event_parse_caps (event, &caps);
631
632       s = gst_caps_get_structure (caps, 0);
633       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
634         ssrc = -1;
635       if (!gst_structure_get_int (s, "payload", &payload))
636         payload = -1;
637
638       if (payload == -1 || ssrc == G_MAXUINT)
639         break;
640
641       if (payload == -1)
642         GST_WARNING_OBJECT (rtx, "No payload in caps");
643
644       GST_OBJECT_LOCK (rtx);
645       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
646       if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
647               GUINT_TO_POINTER (payload), NULL, &rtx_payload))
648         rtx_payload = GINT_TO_POINTER (-1);
649
650       if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
651         GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
652
653       GST_DEBUG_OBJECT (rtx,
654           "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
655           payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
656
657       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
658
659       /* The session might need to know the RTX ssrc */
660       caps = gst_caps_copy (caps);
661       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
662           "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
663
664       if (GPOINTER_TO_INT (rtx_payload) != -1)
665         gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
666             GPOINTER_TO_INT (rtx_payload), NULL);
667
668       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
669           data->clock_rate, ssrc);
670       GST_OBJECT_UNLOCK (rtx);
671
672       gst_event_unref (event);
673       event = gst_event_new_caps (caps);
674       gst_caps_unref (caps);
675       break;
676     }
677     default:
678       break;
679   }
680   return gst_pad_event_default (pad, parent, event);
681 }
682
683 /* like rtp_jitter_buffer_get_ts_diff() */
684 static guint32
685 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
686 {
687   guint64 high_ts, low_ts;
688   BufferQueueItem *high_buf, *low_buf;
689   guint32 result;
690
691   high_buf =
692       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
693           (data->queue)));
694   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
695
696   if (!high_buf || !low_buf || high_buf == low_buf)
697     return 0;
698
699   high_ts = high_buf->timestamp;
700   low_ts = low_buf->timestamp;
701
702   /* it needs to work if ts wraps */
703   if (high_ts >= low_ts) {
704     result = (guint32) (high_ts - low_ts);
705   } else {
706     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
707   }
708
709   /* return value in ms instead of clock ticks */
710   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
711 }
712
713 /* Must be called with lock */
714 static void
715 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
716 {
717   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
718   BufferQueueItem *item;
719   SSRCRtxData *data;
720   guint16 seqnum;
721   guint8 payload_type;
722   guint32 ssrc, rtptime;
723
724   /* read the information we want from the buffer */
725   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
726   seqnum = gst_rtp_buffer_get_seq (&rtp);
727   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
728   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
729   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
730   gst_rtp_buffer_unmap (&rtp);
731
732   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
733       ssrc);
734
735   /* do not store the buffer if it's payload type is unknown */
736   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
737     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
738
739     if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
740       data->clock_rate =
741           GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
742               GUINT_TO_POINTER (payload_type)));
743     }
744
745     /* add current rtp buffer to queue history */
746     item = g_slice_new0 (BufferQueueItem);
747     item->seqnum = seqnum;
748     item->timestamp = rtptime;
749     item->buffer = gst_buffer_ref (buffer);
750     g_sequence_append (data->queue, item);
751
752     /* remove oldest packets from history if they are too many */
753     if (rtx->max_size_packets) {
754       while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
755         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
756     }
757     if (rtx->max_size_time) {
758       while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
759         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
760     }
761   }
762 }
763
764 static GstFlowReturn
765 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
766 {
767   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
768   GstFlowReturn ret;
769
770   GST_OBJECT_LOCK (rtx);
771   process_buffer (rtx, buffer);
772   GST_OBJECT_UNLOCK (rtx);
773   ret = gst_pad_push (rtx->srcpad, buffer);
774
775   return ret;
776 }
777
778 static gboolean
779 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
780 {
781   process_buffer (user_data, *buffer);
782   return TRUE;
783 }
784
785 static GstFlowReturn
786 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
787     GstBufferList * list)
788 {
789   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
790   GstFlowReturn ret;
791
792   GST_OBJECT_LOCK (rtx);
793   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
794   GST_OBJECT_UNLOCK (rtx);
795
796   ret = gst_pad_push_list (rtx->srcpad, list);
797
798   return ret;
799 }
800
801 static void
802 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
803 {
804   GstDataQueueItem *data;
805
806   if (gst_data_queue_pop (rtx->queue, &data)) {
807     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
808
809     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
810       GST_OBJECT_LOCK (rtx);
811       /* Update statistics just before pushing. */
812       rtx->num_rtx_packets++;
813       GST_OBJECT_UNLOCK (rtx);
814
815       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
816     } else if (GST_IS_EVENT (data->object)) {
817       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
818
819       /* after EOS, we should not send any more buffers,
820        * even if there are more requests coming in */
821       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
822         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
823       }
824     } else {
825       g_assert_not_reached ();
826     }
827
828     data->object = NULL;        /* we no longer own that object */
829     data->destroy (data);
830   } else {
831     GST_LOG_OBJECT (rtx, "flushing");
832     gst_pad_pause_task (rtx->srcpad);
833   }
834 }
835
836 static gboolean
837 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
838     GstPadMode mode, gboolean active)
839 {
840   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
841   gboolean ret = FALSE;
842
843   switch (mode) {
844     case GST_PAD_MODE_PUSH:
845       if (active) {
846         gst_rtp_rtx_send_set_flushing (rtx, FALSE);
847         ret = gst_pad_start_task (rtx->srcpad,
848             (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
849       } else {
850         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
851         ret = gst_pad_stop_task (rtx->srcpad);
852       }
853       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
854       break;
855     default:
856       break;
857   }
858   return ret;
859 }
860
861 static void
862 gst_rtp_rtx_send_get_property (GObject * object,
863     guint prop_id, GValue * value, GParamSpec * pspec)
864 {
865   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
866
867   switch (prop_id) {
868     case PROP_PAYLOAD_TYPE_MAP:
869       GST_OBJECT_LOCK (rtx);
870       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
871       GST_OBJECT_UNLOCK (rtx);
872       break;
873     case PROP_MAX_SIZE_TIME:
874       GST_OBJECT_LOCK (rtx);
875       g_value_set_uint (value, rtx->max_size_time);
876       GST_OBJECT_UNLOCK (rtx);
877       break;
878     case PROP_MAX_SIZE_PACKETS:
879       GST_OBJECT_LOCK (rtx);
880       g_value_set_uint (value, rtx->max_size_packets);
881       GST_OBJECT_UNLOCK (rtx);
882       break;
883     case PROP_NUM_RTX_REQUESTS:
884       GST_OBJECT_LOCK (rtx);
885       g_value_set_uint (value, rtx->num_rtx_requests);
886       GST_OBJECT_UNLOCK (rtx);
887       break;
888     case PROP_NUM_RTX_PACKETS:
889       GST_OBJECT_LOCK (rtx);
890       g_value_set_uint (value, rtx->num_rtx_packets);
891       GST_OBJECT_UNLOCK (rtx);
892       break;
893     case PROP_CLOCK_RATE_MAP:
894       GST_OBJECT_LOCK (rtx);
895       g_value_set_boxed (value, rtx->clock_rate_map_structure);
896       GST_OBJECT_UNLOCK (rtx);
897       break;
898     default:
899       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
900       break;
901   }
902 }
903
904 static gboolean
905 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
906 {
907   const gchar *field_str;
908   guint field_uint;
909   guint value_uint;
910
911   field_str = g_quark_to_string (field_id);
912   field_uint = atoi (field_str);
913   value_uint = g_value_get_uint (value);
914   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
915       GUINT_TO_POINTER (value_uint));
916
917   return TRUE;
918 }
919
920 static void
921 gst_rtp_rtx_send_set_property (GObject * object,
922     guint prop_id, const GValue * value, GParamSpec * pspec)
923 {
924   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
925
926   switch (prop_id) {
927     case PROP_SSRC_MAP:
928       GST_OBJECT_LOCK (rtx);
929       if (rtx->external_ssrc_map)
930         gst_structure_free (rtx->external_ssrc_map);
931       rtx->external_ssrc_map = g_value_dup_boxed (value);
932       GST_OBJECT_UNLOCK (rtx);
933       break;
934     case PROP_PAYLOAD_TYPE_MAP:
935       GST_OBJECT_LOCK (rtx);
936       if (rtx->rtx_pt_map_structure)
937         gst_structure_free (rtx->rtx_pt_map_structure);
938       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
939       g_hash_table_remove_all (rtx->rtx_pt_map);
940       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
941           rtx->rtx_pt_map);
942       GST_OBJECT_UNLOCK (rtx);
943       break;
944     case PROP_MAX_SIZE_TIME:
945       GST_OBJECT_LOCK (rtx);
946       rtx->max_size_time = g_value_get_uint (value);
947       GST_OBJECT_UNLOCK (rtx);
948       break;
949     case PROP_MAX_SIZE_PACKETS:
950       GST_OBJECT_LOCK (rtx);
951       rtx->max_size_packets = g_value_get_uint (value);
952       GST_OBJECT_UNLOCK (rtx);
953       break;
954     case PROP_CLOCK_RATE_MAP:
955       GST_OBJECT_LOCK (rtx);
956       if (rtx->clock_rate_map_structure)
957         gst_structure_free (rtx->clock_rate_map_structure);
958       rtx->clock_rate_map_structure = g_value_dup_boxed (value);
959       g_hash_table_remove_all (rtx->clock_rate_map);
960       gst_structure_foreach (rtx->clock_rate_map_structure,
961           structure_to_hash_table, rtx->clock_rate_map);
962       GST_OBJECT_UNLOCK (rtx);
963       break;
964     default:
965       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
966       break;
967   }
968 }
969
970 static GstStateChangeReturn
971 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
972 {
973   GstStateChangeReturn ret;
974   GstRtpRtxSend *rtx;
975
976   rtx = GST_RTP_RTX_SEND (element);
977
978   switch (transition) {
979     default:
980       break;
981   }
982
983   ret =
984       GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
985       transition);
986
987   switch (transition) {
988     case GST_STATE_CHANGE_PAUSED_TO_READY:
989       gst_rtp_rtx_send_reset (rtx);
990       break;
991     default:
992       break;
993   }
994
995   return ret;
996 }
997
998 gboolean
999 gst_rtp_rtx_send_plugin_init (GstPlugin * plugin)
1000 {
1001   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
1002       "rtp retransmission sender");
1003
1004   return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE,
1005       GST_TYPE_RTP_RTX_SEND);
1006 }