Move stereo plugin from -bad
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxsend.c
1 /* RTP Retransmission sender element for GStreamer
2  *
3  * gstrtprtxsend.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-rtprtxsend
26  *
27  * See #GstRtpRtxReceive for examples
28  * 
29  * The purpose of the sender RTX object is to keep a history of RTP packets up
30  * to a configurable limit (max-size-time or max-size-packets). It will listen
31  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
32  * comes from downstream (#GstRtpSession). When receiving a request it will
33  * look up the requested seqnum in its list of stored packets. If the packet
34  * is available, it will create a RTX packet according to RFC 4588 and send
35  * this as an auxiliary stream. RTX is SSRC-multiplexed
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/rtp/gstrtpbuffer.h>
44 #include <string.h>
45 #include <stdlib.h>
46
47 #include "gstrtprtxsend.h"
48
49 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
50 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
51
52 #define DEFAULT_RTX_PAYLOAD_TYPE 0
53 #define DEFAULT_MAX_SIZE_TIME    0
54 #define DEFAULT_MAX_SIZE_PACKETS 100
55
56 enum
57 {
58   PROP_0,
59   PROP_SSRC_MAP,
60   PROP_PAYLOAD_TYPE_MAP,
61   PROP_MAX_SIZE_TIME,
62   PROP_MAX_SIZE_PACKETS,
63   PROP_NUM_RTX_REQUESTS,
64   PROP_NUM_RTX_PACKETS
65 };
66
67 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
68     GST_PAD_SRC,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS ("application/x-rtp")
71     );
72
73 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
74     GST_PAD_SINK,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
77     );
78
79 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
80     guint visible, guint bytes, guint64 time, gpointer checkdata);
81
82 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
83     GstEvent * event);
84 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
85     GstEvent * event);
86 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
87     GstBuffer * buffer);
88 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
89     GstObject * parent, GstBufferList * list);
90
91 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
92 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
93     GstObject * parent, GstPadMode mode, gboolean active);
94
95 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
96     element, GstStateChange transition);
97
98 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
99     const GValue * value, GParamSpec * pspec);
100 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
101     GValue * value, GParamSpec * pspec);
102 static void gst_rtp_rtx_send_finalize (GObject * object);
103
104 G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
105
106 typedef struct
107 {
108   guint16 seqnum;
109   guint32 timestamp;
110   GstBuffer *buffer;
111 } BufferQueueItem;
112
113 static void
114 buffer_queue_item_free (BufferQueueItem * item)
115 {
116   gst_buffer_unref (item->buffer);
117   g_slice_free (BufferQueueItem, item);
118 }
119
120 typedef struct
121 {
122   guint32 rtx_ssrc;
123   guint16 seqnum_base, next_seqnum;
124   gint clock_rate;
125
126   /* history of rtp packets */
127   GSequence *queue;
128 } SSRCRtxData;
129
130 static SSRCRtxData *
131 ssrc_rtx_data_new (guint32 rtx_ssrc)
132 {
133   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
134
135   data->rtx_ssrc = rtx_ssrc;
136   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
137   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
138
139   return data;
140 }
141
142 static void
143 ssrc_rtx_data_free (SSRCRtxData * data)
144 {
145   g_sequence_free (data->queue);
146   g_slice_free (SSRCRtxData, data);
147 }
148
149 static void
150 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
151 {
152   GObjectClass *gobject_class;
153   GstElementClass *gstelement_class;
154
155   gobject_class = (GObjectClass *) klass;
156   gstelement_class = (GstElementClass *) klass;
157
158   gobject_class->get_property = gst_rtp_rtx_send_get_property;
159   gobject_class->set_property = gst_rtp_rtx_send_set_property;
160   gobject_class->finalize = gst_rtp_rtx_send_finalize;
161
162   g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
163       g_param_spec_boxed ("ssrc-map", "SSRC Map",
164           "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
165           " (default = random)", GST_TYPE_STRUCTURE,
166           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
167
168   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
169       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
170           "Map of original payload types to their retransmission payload types",
171           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172
173   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
174       g_param_spec_uint ("max-size-time", "Max Size Time",
175           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
176           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
179       g_param_spec_uint ("max-size-packets", "Max Size Packets",
180           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
181           DEFAULT_MAX_SIZE_PACKETS,
182           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183
184   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
185       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
186           "Number of retransmission events received", 0, G_MAXUINT,
187           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
188
189   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
190       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
191           " Number of retransmission packets sent", 0, G_MAXUINT,
192           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
193
194   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
195   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
196
197   gst_element_class_set_static_metadata (gstelement_class,
198       "RTP Retransmission Sender", "Codec",
199       "Retransmit RTP packets when needed, according to RFC4588",
200       "Julien Isorce <julien.isorce@collabora.co.uk>");
201
202   gstelement_class->change_state =
203       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
204 }
205
206 static void
207 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
208 {
209   GST_OBJECT_LOCK (rtx);
210   gst_data_queue_flush (rtx->queue);
211   g_hash_table_remove_all (rtx->ssrc_data);
212   g_hash_table_remove_all (rtx->rtx_ssrcs);
213   rtx->num_rtx_requests = 0;
214   rtx->num_rtx_packets = 0;
215   GST_OBJECT_UNLOCK (rtx);
216 }
217
218 static void
219 gst_rtp_rtx_send_finalize (GObject * object)
220 {
221   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
222
223   g_hash_table_unref (rtx->ssrc_data);
224   g_hash_table_unref (rtx->rtx_ssrcs);
225   if (rtx->external_ssrc_map)
226     gst_structure_free (rtx->external_ssrc_map);
227   g_hash_table_unref (rtx->rtx_pt_map);
228   if (rtx->rtx_pt_map_structure)
229     gst_structure_free (rtx->rtx_pt_map_structure);
230   g_object_unref (rtx->queue);
231
232   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
233 }
234
235 static void
236 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
237 {
238   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
239
240   rtx->srcpad =
241       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
242           "src"), "src");
243   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
244   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
245   gst_pad_set_event_function (rtx->srcpad,
246       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
247   gst_pad_set_activatemode_function (rtx->srcpad,
248       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
249   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
250
251   rtx->sinkpad =
252       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
253           "sink"), "sink");
254   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
255   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
256   gst_pad_set_event_function (rtx->sinkpad,
257       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
258   gst_pad_set_chain_function (rtx->sinkpad,
259       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
260   gst_pad_set_chain_list_function (rtx->sinkpad,
261       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
262   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
263
264   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
265       NULL, rtx);
266   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
267       NULL, (GDestroyNotify) ssrc_rtx_data_free);
268   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
269   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
270
271   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
272   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
273 }
274
275 static void
276 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
277 {
278   GST_OBJECT_LOCK (rtx);
279   gst_data_queue_set_flushing (rtx->queue, flush);
280   gst_data_queue_flush (rtx->queue);
281   GST_OBJECT_UNLOCK (rtx);
282 }
283
284 static gboolean
285 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
286     guint visible, guint bytes, guint64 time, gpointer checkdata)
287 {
288   return FALSE;
289 }
290
291 static void
292 gst_rtp_rtx_data_queue_item_free (gpointer item)
293 {
294   GstDataQueueItem *data = item;
295   if (data->object)
296     gst_mini_object_unref (data->object);
297   g_slice_free (GstDataQueueItem, data);
298 }
299
300 static gboolean
301 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
302 {
303   GstDataQueueItem *data;
304   gboolean success;
305
306   data = g_slice_new0 (GstDataQueueItem);
307   data->object = GST_MINI_OBJECT (object);
308   data->size = 1;
309   data->duration = 1;
310   data->visible = TRUE;
311   data->destroy = gst_rtp_rtx_data_queue_item_free;
312
313   success = gst_data_queue_push (rtx->queue, data);
314
315   if (!success)
316     data->destroy (data);
317
318   return success;
319 }
320
321 static guint32
322 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
323     gboolean consider_choice)
324 {
325   guint32 ssrc = consider_choice ? choice : g_random_int ();
326
327   /* make sure to be different than any other */
328   while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
329       g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
330     ssrc = g_random_int ();
331   }
332
333   return ssrc;
334 }
335
336 static SSRCRtxData *
337 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
338 {
339   SSRCRtxData *data;
340   guint32 rtx_ssrc = 0;
341   gboolean consider = FALSE;
342
343   if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
344               GUINT_TO_POINTER (ssrc)))) {
345     if (rtx->external_ssrc_map) {
346       gchar *ssrc_str;
347       ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
348       consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
349           &rtx_ssrc);
350       g_free (ssrc_str);
351     }
352     rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
353     data = ssrc_rtx_data_new (rtx_ssrc);
354     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
355     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
356         GUINT_TO_POINTER (ssrc));
357   } else {
358     data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
359   }
360   return data;
361 }
362
363 /* Copy fixed header and extension. Add OSN before to copy payload
364  * Copy memory to avoid to manually copy each rtp buffer field.
365  */
366 static GstBuffer *
367 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
368 {
369   GstMemory *mem = NULL;
370   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
371   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
372   GstBuffer *new_buffer = gst_buffer_new ();
373   GstMapInfo map;
374   guint payload_len = 0;
375   SSRCRtxData *data;
376   guint32 ssrc;
377   guint16 seqnum;
378   guint8 fmtp;
379
380   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
381
382   /* get needed data from GstRtpRtxSend */
383   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
384   data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
385   ssrc = data->rtx_ssrc;
386   seqnum = data->next_seqnum++;
387   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
388           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
389
390   GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
391       "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
392       seqnum, ssrc);
393
394   /* gst_rtp_buffer_map does not map the payload so do it now */
395   gst_rtp_buffer_get_payload (&rtp);
396
397   /* copy fixed header */
398   mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
399   gst_buffer_append_memory (new_buffer, mem);
400
401   /* copy extension if any */
402   if (rtp.size[1]) {
403     mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
404     gst_memory_map (mem, &map, GST_MAP_WRITE);
405     memcpy (map.data, rtp.data[1], rtp.size[1]);
406     gst_memory_unmap (mem, &map);
407     gst_buffer_append_memory (new_buffer, mem);
408   }
409
410   /* copy payload and add OSN just before */
411   payload_len = 2 + rtp.size[2];
412   mem = gst_allocator_alloc (NULL, payload_len, NULL);
413
414   gst_memory_map (mem, &map, GST_MAP_WRITE);
415   GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
416   if (rtp.size[2])
417     memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
418   gst_memory_unmap (mem, &map);
419   gst_buffer_append_memory (new_buffer, mem);
420
421   /* everything needed is copied */
422   gst_rtp_buffer_unmap (&rtp);
423
424   /* set ssrc, seqnum and fmtp */
425   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
426   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
427   gst_rtp_buffer_set_seq (&new_rtp, seqnum);
428   gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
429   /* RFC 4588: let other elements do the padding, as normal */
430   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
431   gst_rtp_buffer_unmap (&new_rtp);
432
433   /* Copy over timestamps */
434   gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
435
436   return new_buffer;
437 }
438
439 static gint
440 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
441     gpointer user_data)
442 {
443   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
444    * it returns negative when seqnum1 > seqnum2 and we want negative
445    * when b > a, i.e. a is smaller, so it comes first in the sequence */
446   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
447 }
448
449 static gboolean
450 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
451 {
452   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
453   gboolean res;
454
455   switch (GST_EVENT_TYPE (event)) {
456     case GST_EVENT_CUSTOM_UPSTREAM:
457     {
458       const GstStructure *s = gst_event_get_structure (event);
459
460       /* This event usually comes from the downstream gstrtpsession */
461       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
462         guint seqnum = 0;
463         guint ssrc = 0;
464         GstBuffer *rtx_buf = NULL;
465
466         /* retrieve seqnum of the packet that need to be retransmitted */
467         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
468           seqnum = -1;
469
470         /* retrieve ssrc of the packet that need to be retransmitted */
471         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
472           ssrc = -1;
473
474         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
475             seqnum, ssrc);
476
477         GST_OBJECT_LOCK (rtx);
478         /* check if request is for us */
479         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
480           SSRCRtxData *data;
481           GSequenceIter *iter;
482           BufferQueueItem search_item;
483
484           /* update statistics */
485           ++rtx->num_rtx_requests;
486
487           data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
488
489           search_item.seqnum = seqnum;
490           iter = g_sequence_lookup (data->queue, &search_item,
491               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
492           if (iter) {
493             BufferQueueItem *item = g_sequence_get (iter);
494             GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
495             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
496           }
497         }
498         GST_OBJECT_UNLOCK (rtx);
499
500         if (rtx_buf)
501           gst_rtp_rtx_send_push_out (rtx, rtx_buf);
502
503         gst_event_unref (event);
504         res = TRUE;
505
506         /* This event usually comes from the downstream gstrtpsession */
507       } else if (gst_structure_has_name (s, "GstRTPCollision")) {
508         guint ssrc = 0;
509
510         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
511           ssrc = -1;
512
513         GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
514
515         GST_OBJECT_LOCK (rtx);
516
517         /* choose another ssrc for our retransmited stream */
518         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
519           guint master_ssrc;
520           SSRCRtxData *data;
521
522           master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
523                   GUINT_TO_POINTER (ssrc)));
524           data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
525
526           /* change rtx_ssrc and update the reverse map */
527           data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
528           g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
529           g_hash_table_insert (rtx->rtx_ssrcs,
530               GUINT_TO_POINTER (data->rtx_ssrc),
531               GUINT_TO_POINTER (master_ssrc));
532
533           GST_OBJECT_UNLOCK (rtx);
534
535           /* no need to forward to payloader because we make sure to have
536            * a different ssrc
537            */
538           gst_event_unref (event);
539           res = TRUE;
540         } else {
541           /* if master ssrc has collided, remove it from our data, as it
542            * is not going to be used any longer */
543           if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
544             SSRCRtxData *data;
545             data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
546             g_hash_table_remove (rtx->rtx_ssrcs,
547                 GUINT_TO_POINTER (data->rtx_ssrc));
548             g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
549           }
550
551           GST_OBJECT_UNLOCK (rtx);
552
553           /* forward event to payloader in case collided ssrc is
554            * master stream */
555           res = gst_pad_event_default (pad, parent, event);
556         }
557       } else {
558         res = gst_pad_event_default (pad, parent, event);
559       }
560       break;
561     }
562     default:
563       res = gst_pad_event_default (pad, parent, event);
564       break;
565   }
566   return res;
567 }
568
569 static gboolean
570 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
571 {
572   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
573
574   switch (GST_EVENT_TYPE (event)) {
575     case GST_EVENT_FLUSH_START:
576       gst_pad_push_event (rtx->srcpad, event);
577       gst_rtp_rtx_send_set_flushing (rtx, TRUE);
578       gst_pad_pause_task (rtx->srcpad);
579       return TRUE;
580     case GST_EVENT_FLUSH_STOP:
581       gst_pad_push_event (rtx->srcpad, event);
582       gst_rtp_rtx_send_set_flushing (rtx, FALSE);
583       gst_pad_start_task (rtx->srcpad,
584           (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
585       return TRUE;
586     case GST_EVENT_EOS:
587       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
588       gst_rtp_rtx_send_push_out (rtx, event);
589       return TRUE;
590     case GST_EVENT_CAPS:
591     {
592       GstCaps *caps;
593       GstStructure *s;
594       guint ssrc;
595       gint payload;
596       gpointer rtx_payload;
597       SSRCRtxData *data;
598
599       gst_event_parse_caps (event, &caps);
600
601       s = gst_caps_get_structure (caps, 0);
602       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
603         ssrc = -1;
604       if (!gst_structure_get_int (s, "payload", &payload))
605         payload = -1;
606
607       if (payload == -1)
608         GST_WARNING_OBJECT (rtx, "No payload in caps");
609
610       GST_OBJECT_LOCK (rtx);
611       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
612       if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
613               GUINT_TO_POINTER (payload), NULL, &rtx_payload))
614         rtx_payload = GINT_TO_POINTER (-1);
615
616       if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
617         GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
618
619       GST_DEBUG_OBJECT (rtx,
620           "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
621           payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
622
623       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
624
625       /* The session might need to know the RTX ssrc */
626       caps = gst_caps_copy (caps);
627       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
628           "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
629
630       if (GPOINTER_TO_INT (rtx_payload) != -1)
631         gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
632             GPOINTER_TO_INT (rtx_payload), NULL);
633
634       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
635           data->clock_rate, ssrc);
636       GST_OBJECT_UNLOCK (rtx);
637
638       gst_event_unref (event);
639       event = gst_event_new_caps (caps);
640       gst_caps_unref (caps);
641       break;
642     }
643     default:
644       break;
645   }
646   return gst_pad_event_default (pad, parent, event);
647 }
648
649 /* like rtp_jitter_buffer_get_ts_diff() */
650 static guint32
651 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
652 {
653   guint64 high_ts, low_ts;
654   BufferQueueItem *high_buf, *low_buf;
655   guint32 result;
656
657   high_buf =
658       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
659           (data->queue)));
660   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
661
662   if (!high_buf || !low_buf || high_buf == low_buf)
663     return 0;
664
665   high_ts = high_buf->timestamp;
666   low_ts = low_buf->timestamp;
667
668   /* it needs to work if ts wraps */
669   if (high_ts >= low_ts) {
670     result = (guint32) (high_ts - low_ts);
671   } else {
672     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
673   }
674
675   /* return value in ms instead of clock ticks */
676   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
677 }
678
679 /* Must be called with lock */
680 static void
681 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
682 {
683   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
684   BufferQueueItem *item;
685   SSRCRtxData *data;
686   guint16 seqnum;
687   guint8 payload_type;
688   guint32 ssrc, rtptime;
689
690   /* read the information we want from the buffer */
691   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
692   seqnum = gst_rtp_buffer_get_seq (&rtp);
693   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
694   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
695   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
696   gst_rtp_buffer_unmap (&rtp);
697
698   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
699       ssrc);
700
701   /* do not store the buffer if it's payload type is unknown */
702   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
703     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
704
705     /* add current rtp buffer to queue history */
706     item = g_slice_new0 (BufferQueueItem);
707     item->seqnum = seqnum;
708     item->timestamp = rtptime;
709     item->buffer = gst_buffer_ref (buffer);
710     g_sequence_append (data->queue, item);
711
712     /* remove oldest packets from history if they are too many */
713     if (rtx->max_size_packets) {
714       while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
715         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
716     }
717     if (rtx->max_size_time) {
718       while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
719         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
720     }
721   }
722 }
723
724 static GstFlowReturn
725 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
726 {
727   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
728   GstFlowReturn ret;
729
730   GST_OBJECT_LOCK (rtx);
731   process_buffer (rtx, buffer);
732   GST_OBJECT_UNLOCK (rtx);
733   ret = gst_pad_push (rtx->srcpad, buffer);
734
735   return ret;
736 }
737
738 static gboolean
739 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
740 {
741   process_buffer (user_data, *buffer);
742   return TRUE;
743 }
744
745 static GstFlowReturn
746 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
747     GstBufferList * list)
748 {
749   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
750   GstFlowReturn ret;
751
752   GST_OBJECT_LOCK (rtx);
753   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
754   GST_OBJECT_UNLOCK (rtx);
755
756   ret = gst_pad_push_list (rtx->srcpad, list);
757
758   return ret;
759 }
760
761 static void
762 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
763 {
764   GstDataQueueItem *data;
765
766   if (gst_data_queue_pop (rtx->queue, &data)) {
767     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
768
769     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
770       GST_OBJECT_LOCK (rtx);
771       /* Update statistics just before pushing. */
772       rtx->num_rtx_packets++;
773       GST_OBJECT_UNLOCK (rtx);
774
775       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
776     } else if (GST_IS_EVENT (data->object)) {
777       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
778
779       /* after EOS, we should not send any more buffers,
780        * even if there are more requests coming in */
781       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
782         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
783       }
784     } else {
785       g_assert_not_reached ();
786     }
787
788     data->object = NULL;        /* we no longer own that object */
789     data->destroy (data);
790   } else {
791     GST_LOG_OBJECT (rtx, "flushing");
792     gst_pad_pause_task (rtx->srcpad);
793   }
794 }
795
796 static gboolean
797 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
798     GstPadMode mode, gboolean active)
799 {
800   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
801   gboolean ret = FALSE;
802
803   switch (mode) {
804     case GST_PAD_MODE_PUSH:
805       if (active) {
806         gst_rtp_rtx_send_set_flushing (rtx, FALSE);
807         ret = gst_pad_start_task (rtx->srcpad,
808             (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
809       } else {
810         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
811         ret = gst_pad_stop_task (rtx->srcpad);
812       }
813       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
814       break;
815     default:
816       break;
817   }
818   return ret;
819 }
820
821 static void
822 gst_rtp_rtx_send_get_property (GObject * object,
823     guint prop_id, GValue * value, GParamSpec * pspec)
824 {
825   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
826
827   switch (prop_id) {
828     case PROP_PAYLOAD_TYPE_MAP:
829       GST_OBJECT_LOCK (rtx);
830       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
831       GST_OBJECT_UNLOCK (rtx);
832       break;
833     case PROP_MAX_SIZE_TIME:
834       GST_OBJECT_LOCK (rtx);
835       g_value_set_uint (value, rtx->max_size_time);
836       GST_OBJECT_UNLOCK (rtx);
837       break;
838     case PROP_MAX_SIZE_PACKETS:
839       GST_OBJECT_LOCK (rtx);
840       g_value_set_uint (value, rtx->max_size_packets);
841       GST_OBJECT_UNLOCK (rtx);
842       break;
843     case PROP_NUM_RTX_REQUESTS:
844       GST_OBJECT_LOCK (rtx);
845       g_value_set_uint (value, rtx->num_rtx_requests);
846       GST_OBJECT_UNLOCK (rtx);
847       break;
848     case PROP_NUM_RTX_PACKETS:
849       GST_OBJECT_LOCK (rtx);
850       g_value_set_uint (value, rtx->num_rtx_packets);
851       GST_OBJECT_UNLOCK (rtx);
852       break;
853     default:
854       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
855       break;
856   }
857 }
858
859 static gboolean
860 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
861 {
862   const gchar *field_str;
863   guint field_uint;
864   guint value_uint;
865
866   field_str = g_quark_to_string (field_id);
867   field_uint = atoi (field_str);
868   value_uint = g_value_get_uint (value);
869   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
870       GUINT_TO_POINTER (value_uint));
871
872   return TRUE;
873 }
874
875 static void
876 gst_rtp_rtx_send_set_property (GObject * object,
877     guint prop_id, const GValue * value, GParamSpec * pspec)
878 {
879   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
880
881   switch (prop_id) {
882     case PROP_SSRC_MAP:
883       GST_OBJECT_LOCK (rtx);
884       if (rtx->external_ssrc_map)
885         gst_structure_free (rtx->external_ssrc_map);
886       rtx->external_ssrc_map = g_value_dup_boxed (value);
887       GST_OBJECT_UNLOCK (rtx);
888       break;
889     case PROP_PAYLOAD_TYPE_MAP:
890       GST_OBJECT_LOCK (rtx);
891       if (rtx->rtx_pt_map_structure)
892         gst_structure_free (rtx->rtx_pt_map_structure);
893       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
894       g_hash_table_remove_all (rtx->rtx_pt_map);
895       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
896           rtx->rtx_pt_map);
897       GST_OBJECT_UNLOCK (rtx);
898       break;
899     case PROP_MAX_SIZE_TIME:
900       GST_OBJECT_LOCK (rtx);
901       rtx->max_size_time = g_value_get_uint (value);
902       GST_OBJECT_UNLOCK (rtx);
903       break;
904     case PROP_MAX_SIZE_PACKETS:
905       GST_OBJECT_LOCK (rtx);
906       rtx->max_size_packets = g_value_get_uint (value);
907       GST_OBJECT_UNLOCK (rtx);
908       break;
909     default:
910       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
911       break;
912   }
913 }
914
915 static GstStateChangeReturn
916 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
917 {
918   GstStateChangeReturn ret;
919   GstRtpRtxSend *rtx;
920
921   rtx = GST_RTP_RTX_SEND (element);
922
923   switch (transition) {
924     default:
925       break;
926   }
927
928   ret =
929       GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
930       transition);
931
932   switch (transition) {
933     case GST_STATE_CHANGE_PAUSED_TO_READY:
934       gst_rtp_rtx_send_reset (rtx);
935       break;
936     default:
937       break;
938   }
939
940   return ret;
941 }
942
943 gboolean
944 gst_rtp_rtx_send_plugin_init (GstPlugin * plugin)
945 {
946   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
947       "rtp retransmission sender");
948
949   return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE,
950       GST_TYPE_RTP_RTX_SEND);
951 }