rtpsession: Track RTX ssrc caps
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxsend.c
1 /* RTP Retransmission sender element for GStreamer
2  *
3  * gstrtprtxsend.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-rtprtxsend
26  *
27  * See #GstRtpRtxReceive for examples
28  * 
29  * The purpose of the sender RTX object is to keep a history of RTP packets up
30  * to a configurable limit (max-size-time or max-size-packets). It will listen
31  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
32  * comes from downstream (#GstRtpSession). When receiving a request it will
33  * look up the requested seqnum in its list of stored packets. If the packet
34  * is available, it will create a RTX packet according to RFC 4588 and send
35  * this as an auxiliary stream. RTX is SSRC-multiplexed
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/rtp/gstrtpbuffer.h>
44 #include <string.h>
45 #include <stdlib.h>
46
47 #include "gstrtprtxsend.h"
48
49 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
50 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
51
52 #define DEFAULT_RTX_PAYLOAD_TYPE 0
53 #define DEFAULT_MAX_SIZE_TIME    0
54 #define DEFAULT_MAX_SIZE_PACKETS 100
55
56 enum
57 {
58   PROP_0,
59   PROP_SSRC_MAP,
60   PROP_PAYLOAD_TYPE_MAP,
61   PROP_MAX_SIZE_TIME,
62   PROP_MAX_SIZE_PACKETS,
63   PROP_NUM_RTX_REQUESTS,
64   PROP_NUM_RTX_PACKETS,
65   PROP_LAST
66 };
67
68 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
69     GST_PAD_SRC,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS ("application/x-rtp")
72     );
73
74 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
75     GST_PAD_SINK,
76     GST_PAD_ALWAYS,
77     GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
78     );
79
80 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
81     guint visible, guint bytes, guint64 time, gpointer checkdata);
82
83 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
84     GstEvent * event);
85 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
86     GstEvent * event);
87 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
88     GstBuffer * buffer);
89 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
90     GstObject * parent, GstBufferList * list);
91
92 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
93 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
94     GstObject * parent, GstPadMode mode, gboolean active);
95
96 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
97     element, GstStateChange transition);
98
99 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
100     const GValue * value, GParamSpec * pspec);
101 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
102     GValue * value, GParamSpec * pspec);
103 static void gst_rtp_rtx_send_finalize (GObject * object);
104
105 G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
106
107 typedef struct
108 {
109   guint16 seqnum;
110   guint32 timestamp;
111   GstBuffer *buffer;
112 } BufferQueueItem;
113
114 static void
115 buffer_queue_item_free (BufferQueueItem * item)
116 {
117   gst_buffer_unref (item->buffer);
118   g_slice_free (BufferQueueItem, item);
119 }
120
121 typedef struct
122 {
123   guint32 rtx_ssrc;
124   guint16 next_seqnum;
125   gint clock_rate;
126
127   /* history of rtp packets */
128   GSequence *queue;
129 } SSRCRtxData;
130
131 static SSRCRtxData *
132 ssrc_rtx_data_new (guint32 rtx_ssrc)
133 {
134   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
135
136   data->rtx_ssrc = rtx_ssrc;
137   data->next_seqnum = g_random_int_range (0, G_MAXUINT16);
138   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
139
140   return data;
141 }
142
143 static void
144 ssrc_rtx_data_free (SSRCRtxData * data)
145 {
146   g_sequence_free (data->queue);
147   g_slice_free (SSRCRtxData, data);
148 }
149
150 static void
151 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
152 {
153   GObjectClass *gobject_class;
154   GstElementClass *gstelement_class;
155
156   gobject_class = (GObjectClass *) klass;
157   gstelement_class = (GstElementClass *) klass;
158
159   gobject_class->get_property = gst_rtp_rtx_send_get_property;
160   gobject_class->set_property = gst_rtp_rtx_send_set_property;
161   gobject_class->finalize = gst_rtp_rtx_send_finalize;
162
163   g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
164       g_param_spec_boxed ("ssrc-map", "SSRC Map",
165           "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
166           " (default = random)", GST_TYPE_STRUCTURE,
167           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
168
169   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
170       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
171           "Map of original payload types to their retransmission payload types",
172           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173
174   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
175       g_param_spec_uint ("max-size-time", "Max Size Time",
176           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
177           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178
179   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
180       g_param_spec_uint ("max-size-packets", "Max Size Packets",
181           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
182           DEFAULT_MAX_SIZE_PACKETS,
183           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184
185   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
186       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
187           "Number of retransmission events received", 0, G_MAXUINT,
188           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
189
190   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
191       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
192           " Number of retransmission packets sent", 0, G_MAXUINT,
193           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
194
195   gst_element_class_add_pad_template (gstelement_class,
196       gst_static_pad_template_get (&src_factory));
197   gst_element_class_add_pad_template (gstelement_class,
198       gst_static_pad_template_get (&sink_factory));
199
200   gst_element_class_set_static_metadata (gstelement_class,
201       "RTP Retransmission Sender", "Codec",
202       "Retransmit RTP packets when needed, according to RFC4588",
203       "Julien Isorce <julien.isorce@collabora.co.uk>");
204
205   gstelement_class->change_state =
206       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
207 }
208
209 static void
210 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
211 {
212   GST_OBJECT_LOCK (rtx);
213   gst_data_queue_flush (rtx->queue);
214   g_hash_table_remove_all (rtx->ssrc_data);
215   g_hash_table_remove_all (rtx->rtx_ssrcs);
216   rtx->num_rtx_requests = 0;
217   rtx->num_rtx_packets = 0;
218   GST_OBJECT_UNLOCK (rtx);
219 }
220
221 static void
222 gst_rtp_rtx_send_finalize (GObject * object)
223 {
224   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
225
226   g_hash_table_unref (rtx->ssrc_data);
227   g_hash_table_unref (rtx->rtx_ssrcs);
228   if (rtx->external_ssrc_map)
229     gst_structure_free (rtx->external_ssrc_map);
230   g_hash_table_unref (rtx->rtx_pt_map);
231   if (rtx->rtx_pt_map_structure)
232     gst_structure_free (rtx->rtx_pt_map_structure);
233   g_object_unref (rtx->queue);
234
235   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
236 }
237
238 static void
239 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
240 {
241   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
242
243   rtx->srcpad =
244       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
245           "src"), "src");
246   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
247   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
248   gst_pad_set_event_function (rtx->srcpad,
249       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
250   gst_pad_set_activatemode_function (rtx->srcpad,
251       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
252   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
253
254   rtx->sinkpad =
255       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
256           "sink"), "sink");
257   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
258   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
259   gst_pad_set_event_function (rtx->sinkpad,
260       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
261   gst_pad_set_chain_function (rtx->sinkpad,
262       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
263   gst_pad_set_chain_list_function (rtx->sinkpad,
264       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
265   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
266
267   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
268       NULL, rtx);
269   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) ssrc_rtx_data_free);
271   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
272   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
273
274   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
275   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
276 }
277
278 static void
279 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
280 {
281   GST_OBJECT_LOCK (rtx);
282   gst_data_queue_set_flushing (rtx->queue, flush);
283   gst_data_queue_flush (rtx->queue);
284   GST_OBJECT_UNLOCK (rtx);
285 }
286
287 static gboolean
288 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
289     guint visible, guint bytes, guint64 time, gpointer checkdata)
290 {
291   return FALSE;
292 }
293
294 static void
295 gst_rtp_rtx_data_queue_item_free (gpointer item)
296 {
297   GstDataQueueItem *data = item;
298   if (data->object)
299     gst_mini_object_unref (data->object);
300   g_slice_free (GstDataQueueItem, data);
301 }
302
303 static gboolean
304 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
305 {
306   GstDataQueueItem *data;
307   gboolean success;
308
309   data = g_slice_new0 (GstDataQueueItem);
310   data->object = GST_MINI_OBJECT (object);
311   data->size = 1;
312   data->duration = 1;
313   data->visible = TRUE;
314   data->destroy = gst_rtp_rtx_data_queue_item_free;
315
316   success = gst_data_queue_push (rtx->queue, data);
317
318   if (!success)
319     data->destroy (data);
320
321   return success;
322 }
323
324 static guint32
325 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
326     gboolean consider_choice)
327 {
328   guint32 ssrc = consider_choice ? choice : g_random_int ();
329
330   /* make sure to be different than any other */
331   while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
332       g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
333     ssrc = g_random_int ();
334   }
335
336   return ssrc;
337 }
338
339 static SSRCRtxData *
340 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
341 {
342   SSRCRtxData *data;
343   guint32 rtx_ssrc = 0;
344   gboolean consider = FALSE;
345
346   if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
347               GUINT_TO_POINTER (ssrc)))) {
348     if (rtx->external_ssrc_map) {
349       gchar *ssrc_str;
350       ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
351       consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
352           &rtx_ssrc);
353       g_free (ssrc_str);
354     }
355     rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
356     data = ssrc_rtx_data_new (rtx_ssrc);
357     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
358     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
359         GUINT_TO_POINTER (ssrc));
360   } else {
361     data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
362   }
363   return data;
364 }
365
366 /* Copy fixed header and extension. Add OSN before to copy payload
367  * Copy memory to avoid to manually copy each rtp buffer field.
368  */
369 static GstBuffer *
370 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
371 {
372   GstMemory *mem = NULL;
373   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
374   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
375   GstBuffer *new_buffer = gst_buffer_new ();
376   GstMapInfo map;
377   guint payload_len = 0;
378   SSRCRtxData *data;
379   guint32 ssrc;
380   guint16 seqnum;
381   guint8 fmtp;
382
383   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
384
385   /* get needed data from GstRtpRtxSend */
386   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
387   data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
388   ssrc = data->rtx_ssrc;
389   seqnum = data->next_seqnum++;
390   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
391           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
392
393   GST_DEBUG_OBJECT (rtx,
394       "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
395       seqnum, ssrc);
396
397   /* gst_rtp_buffer_map does not map the payload so do it now */
398   gst_rtp_buffer_get_payload (&rtp);
399
400   /* copy fixed header */
401   mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
402   gst_buffer_append_memory (new_buffer, mem);
403
404   /* copy extension if any */
405   if (rtp.size[1]) {
406     mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
407     gst_buffer_append_memory (new_buffer, mem);
408   }
409
410   /* copy payload and add OSN just before */
411   payload_len = 2 + rtp.size[2];
412   mem = gst_allocator_alloc (NULL, payload_len, NULL);
413
414   gst_memory_map (mem, &map, GST_MAP_WRITE);
415   GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
416   if (rtp.size[2])
417     memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
418   gst_memory_unmap (mem, &map);
419   gst_buffer_append_memory (new_buffer, mem);
420
421   /* everything needed is copied */
422   gst_rtp_buffer_unmap (&rtp);
423
424   /* set ssrc, seqnum and fmtp */
425   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
426   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
427   gst_rtp_buffer_set_seq (&new_rtp, seqnum);
428   gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
429   /* RFC 4588: let other elements do the padding, as normal */
430   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
431   gst_rtp_buffer_unmap (&new_rtp);
432
433   /* Copy over timestamps */
434   gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
435
436   return new_buffer;
437 }
438
439 static gint
440 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
441     gpointer user_data)
442 {
443   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
444    * it returns negative when seqnum1 > seqnum2 and we want negative
445    * when b > a, i.e. a is smaller, so it comes first in the sequence */
446   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
447 }
448
449 static gboolean
450 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
451 {
452   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
453   gboolean res;
454
455   switch (GST_EVENT_TYPE (event)) {
456     case GST_EVENT_CUSTOM_UPSTREAM:
457     {
458       const GstStructure *s = gst_event_get_structure (event);
459
460       /* This event usually comes from the downstream gstrtpsession */
461       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
462         guint seqnum = 0;
463         guint ssrc = 0;
464         GstBuffer *rtx_buf = NULL;
465
466         /* retrieve seqnum of the packet that need to be retransmitted */
467         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
468           seqnum = -1;
469
470         /* retrieve ssrc of the packet that need to be retransmitted */
471         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
472           ssrc = -1;
473
474         GST_DEBUG_OBJECT (rtx,
475             "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
476             seqnum, ssrc);
477
478         GST_OBJECT_LOCK (rtx);
479         /* check if request is for us */
480         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
481           SSRCRtxData *data;
482           GSequenceIter *iter;
483           BufferQueueItem search_item;
484
485           /* update statistics */
486           ++rtx->num_rtx_requests;
487
488           data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
489
490           search_item.seqnum = seqnum;
491           iter = g_sequence_lookup (data->queue, &search_item,
492               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
493           if (iter) {
494             BufferQueueItem *item = g_sequence_get (iter);
495             GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
496             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
497           }
498         }
499         GST_OBJECT_UNLOCK (rtx);
500
501         if (rtx_buf)
502           gst_rtp_rtx_send_push_out (rtx, rtx_buf);
503
504         gst_event_unref (event);
505         res = TRUE;
506
507         /* This event usually comes from the downstream gstrtpsession */
508       } else if (gst_structure_has_name (s, "GstRTPCollision")) {
509         guint ssrc = 0;
510
511         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
512           ssrc = -1;
513
514         GST_DEBUG_OBJECT (rtx, "collision ssrc: %" G_GUINT32_FORMAT, ssrc);
515
516         GST_OBJECT_LOCK (rtx);
517
518         /* choose another ssrc for our retransmited stream */
519         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
520           guint master_ssrc;
521           SSRCRtxData *data;
522
523           master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
524                   GUINT_TO_POINTER (ssrc)));
525           data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
526
527           /* change rtx_ssrc and update the reverse map */
528           data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
529           g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
530           g_hash_table_insert (rtx->rtx_ssrcs,
531               GUINT_TO_POINTER (data->rtx_ssrc),
532               GUINT_TO_POINTER (master_ssrc));
533
534           GST_OBJECT_UNLOCK (rtx);
535
536           /* no need to forward to payloader because we make sure to have
537            * a different ssrc
538            */
539           gst_event_unref (event);
540           res = TRUE;
541         } else {
542           /* if master ssrc has collided, remove it from our data, as it
543            * is not going to be used any longer */
544           if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
545             SSRCRtxData *data;
546             data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
547             g_hash_table_remove (rtx->rtx_ssrcs,
548                 GUINT_TO_POINTER (data->rtx_ssrc));
549             g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
550           }
551
552           GST_OBJECT_UNLOCK (rtx);
553
554           /* forward event to payloader in case collided ssrc is
555            * master stream */
556           res = gst_pad_event_default (pad, parent, event);
557         }
558       } else {
559         res = gst_pad_event_default (pad, parent, event);
560       }
561       break;
562     }
563     default:
564       res = gst_pad_event_default (pad, parent, event);
565       break;
566   }
567   return res;
568 }
569
570 static gboolean
571 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
572 {
573   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
574
575   switch (GST_EVENT_TYPE (event)) {
576     case GST_EVENT_FLUSH_START:
577       gst_pad_push_event (rtx->srcpad, event);
578       gst_rtp_rtx_send_set_flushing (rtx, TRUE);
579       gst_pad_pause_task (rtx->srcpad);
580       return TRUE;
581     case GST_EVENT_FLUSH_STOP:
582       gst_pad_push_event (rtx->srcpad, event);
583       gst_rtp_rtx_send_set_flushing (rtx, FALSE);
584       gst_pad_start_task (rtx->srcpad,
585           (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
586       return TRUE;
587     case GST_EVENT_EOS:
588       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
589       gst_rtp_rtx_send_push_out (rtx, event);
590       return TRUE;
591     case GST_EVENT_CAPS:
592     {
593       GstCaps *caps;
594       GstStructure *s;
595       guint ssrc;
596       SSRCRtxData *data;
597
598       gst_event_parse_caps (event, &caps);
599       g_assert (gst_caps_is_fixed (caps));
600
601       s = gst_caps_get_structure (caps, 0);
602       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
603         ssrc = -1;
604
605       GST_OBJECT_LOCK (rtx);
606       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
607       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
608
609       /* The session might need to know the RTX ssrc */
610       caps = gst_caps_copy (caps);
611       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc, NULL);
612
613       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
614           data->clock_rate, ssrc);
615       GST_OBJECT_UNLOCK (rtx);
616
617       gst_event_unref (event);
618       event = gst_event_new_caps (caps);
619       gst_caps_unref (caps);
620       break;
621     }
622     default:
623       break;
624   }
625   return gst_pad_event_default (pad, parent, event);
626 }
627
628 /* like rtp_jitter_buffer_get_ts_diff() */
629 static guint32
630 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
631 {
632   guint64 high_ts, low_ts;
633   BufferQueueItem *high_buf, *low_buf;
634   guint32 result;
635
636   high_buf =
637       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
638           (data->queue)));
639   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
640
641   if (!high_buf || !low_buf || high_buf == low_buf)
642     return 0;
643
644   high_ts = high_buf->timestamp;
645   low_ts = low_buf->timestamp;
646
647   /* it needs to work if ts wraps */
648   if (high_ts >= low_ts) {
649     result = (guint32) (high_ts - low_ts);
650   } else {
651     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
652   }
653
654   /* return value in ms instead of clock ticks */
655   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
656 }
657
658 /* Must be called with lock */
659 static void
660 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
661 {
662   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
663   BufferQueueItem *item;
664   SSRCRtxData *data;
665   guint16 seqnum;
666   guint8 payload_type;
667   guint32 ssrc, rtptime;
668
669   /* read the information we want from the buffer */
670   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
671   seqnum = gst_rtp_buffer_get_seq (&rtp);
672   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
673   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
674   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
675   gst_rtp_buffer_unmap (&rtp);
676
677   GST_LOG_OBJECT (rtx,
678       "Processing buffer seqnum: %" G_GUINT16_FORMAT ", ssrc: %"
679       G_GUINT32_FORMAT, seqnum, ssrc);
680
681   /* do not store the buffer if it's payload type is unknown */
682   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
683     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
684
685     /* add current rtp buffer to queue history */
686     item = g_slice_new0 (BufferQueueItem);
687     item->seqnum = seqnum;
688     item->timestamp = rtptime;
689     item->buffer = gst_buffer_ref (buffer);
690     g_sequence_append (data->queue, item);
691
692     /* remove oldest packets from history if they are too many */
693     if (rtx->max_size_packets) {
694       while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
695         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
696     }
697     if (rtx->max_size_time) {
698       while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
699         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
700     }
701   }
702 }
703
704 static GstFlowReturn
705 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
706 {
707   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
708   GstFlowReturn ret;
709
710   GST_OBJECT_LOCK (rtx);
711   process_buffer (rtx, buffer);
712   GST_OBJECT_UNLOCK (rtx);
713   ret = gst_pad_push (rtx->srcpad, buffer);
714
715   return ret;
716 }
717
718 static gboolean
719 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
720 {
721   process_buffer (user_data, *buffer);
722   return TRUE;
723 }
724
725 static GstFlowReturn
726 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
727     GstBufferList * list)
728 {
729   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
730   GstFlowReturn ret;
731
732   GST_OBJECT_LOCK (rtx);
733   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
734   GST_OBJECT_UNLOCK (rtx);
735
736   ret = gst_pad_push_list (rtx->srcpad, list);
737
738   return ret;
739 }
740
741 static void
742 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
743 {
744   GstDataQueueItem *data;
745
746   if (gst_data_queue_pop (rtx->queue, &data)) {
747     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
748
749     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
750       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
751
752       GST_OBJECT_LOCK (rtx);
753       rtx->num_rtx_packets++;
754       GST_OBJECT_UNLOCK (rtx);
755     } else if (GST_IS_EVENT (data->object)) {
756       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
757
758       /* after EOS, we should not send any more buffers,
759        * even if there are more requests coming in */
760       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
761         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
762       }
763     } else {
764       g_assert_not_reached ();
765     }
766
767     data->object = NULL;        /* we no longer own that object */
768     data->destroy (data);
769   } else {
770     GST_LOG_OBJECT (rtx, "flushing");
771     gst_pad_pause_task (rtx->srcpad);
772   }
773 }
774
775 static gboolean
776 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
777     GstPadMode mode, gboolean active)
778 {
779   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
780   gboolean ret = FALSE;
781
782   switch (mode) {
783     case GST_PAD_MODE_PUSH:
784       if (active) {
785         gst_rtp_rtx_send_set_flushing (rtx, FALSE);
786         ret = gst_pad_start_task (rtx->srcpad,
787             (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
788       } else {
789         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
790         ret = gst_pad_stop_task (rtx->srcpad);
791       }
792       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
793       break;
794     default:
795       break;
796   }
797   return ret;
798 }
799
800 static void
801 gst_rtp_rtx_send_get_property (GObject * object,
802     guint prop_id, GValue * value, GParamSpec * pspec)
803 {
804   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
805
806   switch (prop_id) {
807     case PROP_PAYLOAD_TYPE_MAP:
808       GST_OBJECT_LOCK (rtx);
809       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
810       GST_OBJECT_UNLOCK (rtx);
811       break;
812     case PROP_MAX_SIZE_TIME:
813       GST_OBJECT_LOCK (rtx);
814       g_value_set_uint (value, rtx->max_size_time);
815       GST_OBJECT_UNLOCK (rtx);
816       break;
817     case PROP_MAX_SIZE_PACKETS:
818       GST_OBJECT_LOCK (rtx);
819       g_value_set_uint (value, rtx->max_size_packets);
820       GST_OBJECT_UNLOCK (rtx);
821       break;
822     case PROP_NUM_RTX_REQUESTS:
823       GST_OBJECT_LOCK (rtx);
824       g_value_set_uint (value, rtx->num_rtx_requests);
825       GST_OBJECT_UNLOCK (rtx);
826       break;
827     case PROP_NUM_RTX_PACKETS:
828       GST_OBJECT_LOCK (rtx);
829       g_value_set_uint (value, rtx->num_rtx_packets);
830       GST_OBJECT_UNLOCK (rtx);
831       break;
832     default:
833       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
834       break;
835   }
836 }
837
838 static gboolean
839 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
840 {
841   const gchar *field_str;
842   guint field_uint;
843   guint value_uint;
844
845   field_str = g_quark_to_string (field_id);
846   field_uint = atoi (field_str);
847   value_uint = g_value_get_uint (value);
848   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
849       GUINT_TO_POINTER (value_uint));
850
851   return TRUE;
852 }
853
854 static void
855 gst_rtp_rtx_send_set_property (GObject * object,
856     guint prop_id, const GValue * value, GParamSpec * pspec)
857 {
858   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
859
860   switch (prop_id) {
861     case PROP_SSRC_MAP:
862       GST_OBJECT_LOCK (rtx);
863       if (rtx->external_ssrc_map)
864         gst_structure_free (rtx->external_ssrc_map);
865       rtx->external_ssrc_map = g_value_dup_boxed (value);
866       GST_OBJECT_UNLOCK (rtx);
867       break;
868     case PROP_PAYLOAD_TYPE_MAP:
869       GST_OBJECT_LOCK (rtx);
870       if (rtx->rtx_pt_map_structure)
871         gst_structure_free (rtx->rtx_pt_map_structure);
872       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
873       g_hash_table_remove_all (rtx->rtx_pt_map);
874       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
875           rtx->rtx_pt_map);
876       GST_OBJECT_UNLOCK (rtx);
877       break;
878     case PROP_MAX_SIZE_TIME:
879       GST_OBJECT_LOCK (rtx);
880       rtx->max_size_time = g_value_get_uint (value);
881       GST_OBJECT_UNLOCK (rtx);
882       break;
883     case PROP_MAX_SIZE_PACKETS:
884       GST_OBJECT_LOCK (rtx);
885       rtx->max_size_packets = g_value_get_uint (value);
886       GST_OBJECT_UNLOCK (rtx);
887       break;
888     default:
889       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
890       break;
891   }
892 }
893
894 static GstStateChangeReturn
895 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
896 {
897   GstStateChangeReturn ret;
898   GstRtpRtxSend *rtx;
899
900   rtx = GST_RTP_RTX_SEND (element);
901
902   switch (transition) {
903     default:
904       break;
905   }
906
907   ret =
908       GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
909       transition);
910
911   switch (transition) {
912     case GST_STATE_CHANGE_PAUSED_TO_READY:
913       gst_rtp_rtx_send_reset (rtx);
914       break;
915     default:
916       break;
917   }
918
919   return ret;
920 }
921
922 gboolean
923 gst_rtp_rtx_send_plugin_init (GstPlugin * plugin)
924 {
925   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
926       "rtp retransmission sender");
927
928   return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE,
929       GST_TYPE_RTP_RTX_SEND);
930 }