Merge the tizen patch and fix build err based on 1.12.2
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxreceive.c
1 /* RTP Retransmission receiver element for GStreamer
2  *
3  * gstrtprtxreceive.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-rtprtxreceive
26  * @see_also: rtprtxsend, rtpsession, rtpjitterbuffer
27  *
28  * rtprtxreceive listens to the retransmission events from the
29  * downstream rtpjitterbuffer and remembers the SSRC (ssrc1) of the stream and
30  * the sequence number that was requested. When it receives a packet with
31  * a sequence number equal to one of the ones stored and with a different SSRC,
32  * it identifies the new SSRC (ssrc2) as the retransmission stream of ssrc1.
33  * From this point on, it replaces ssrc2 with ssrc1 in all packets of the
34  * ssrc2 stream and flags them as retransmissions, so that rtpjitterbuffer
35  * can reconstruct the original stream.
36  *
37  * This algorithm is implemented as specified in RFC 4588.
38  *
39  * This element is meant to be used with rtprtxsend on the sender side.
40  * See #GstRtpRtxSend
41  *
42  * Below you can see some examples that illustrate how rtprtxreceive and
43  * rtprtxsend fit among the other rtp elements and how they work internally.
44  * Normally, hoewever, you should avoid using such pipelines and use
45  * rtpbin instead, with its #GstRtpBin::request-aux-sender and
46  * #GstRtpBin::request-aux-receiver signals. See #GstRtpBin.
47  *
48  * # Example pipelines
49  * |[
50  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
51  *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! \
52  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
53  *         rtpsession.send_rtp_sink \
54  *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
55  *         udpsink host="127.0.0.1" port=5000 \
56  *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
57  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
58  *         sync=false async=false
59  * ]| Send audio stream through port 5000 (5001 and 5002 are just the rtcp
60  * link with the receiver)
61  * |[
62  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
63  *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
64  *         rtpsession.recv_rtp_sink \
65  *     rtpsession.recv_rtp_src ! \
66  *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
67  *         rtpssrcdemux ! rtpjitterbuffer do-retransmission=true ! \
68  *         rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
69  *     rtpsession.send_rtcp_src ! \
70  *         udpsink host="127.0.0.1" port=5001 sync=false async=false \
71  *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink
72  * ]| Receive audio stream from port 5000 (5001 and 5002 are just the rtcp
73  * link with the sender)
74  *
75  * In this example we can see a simple streaming of an OPUS stream with some
76  * of the packets being artificially dropped by the identity element.
77  * Thanks to retransmission, you should still hear a clear sound when setting
78  * drop-probability to something greater than 0.
79  *
80  * Internally, the rtpjitterbuffer will generate a custom upstream event,
81  * GstRTPRetransmissionRequest, when it detects that one packet is missing.
82  * Then this request is translated to a FB NACK in the rtcp link by rtpsession.
83  * Finally the rtpsession of the sender side will re-convert it in a
84  * GstRTPRetransmissionRequest that will be handled by rtprtxsend. rtprtxsend
85  * will then re-send the missing packet with a new srrc and a different payload
86  * type (here, 97), but with the same original sequence number. On the receiver
87  * side, rtprtxreceive will associate this new stream with the original and
88  * forward the retransmission packets to rtpjitterbuffer with the original
89  * ssrc and payload type.
90  *
91  * |[
92  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
93  *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 seqnum-offset=1 ! \
94  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
95  *         funnel name=f ! rtpsession.send_rtp_sink \
96  *     audiotestsrc freq=660.0 is-live=true ! opusenc ! \
97  *         rtpopuspay pt=97 seqnum-offset=100 ! \
98  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
99  *         f. \
100  *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
101  *         udpsink host="127.0.0.1" port=5000 \
102  *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
103  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
104  *         sync=false async=false
105  * ]| Send two audio streams to port 5000.
106  * |[
107  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
108  *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)97" ! \
109  *         rtpsession.recv_rtp_sink \
110  *     rtpsession.recv_rtp_src ! \
111  *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
112  *         rtpssrcdemux name=demux \
113  *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
114  *         opusdec ! audioconvert ! autoaudiosink \
115  *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
116  *         opusdec ! audioconvert ! autoaudiosink \
117  *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink \
118  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5001 \
119  *         sync=false async=false
120  * ]| Receive two audio streams from port 5000.
121  *
122  * In this example we are streaming two streams of the same type through the
123  * same port. They, however, are using a different SSRC (ssrc is randomly
124  * generated on each payloader - rtpopuspay in this example), so they can be
125  * identified and demultiplexed by rtpssrcdemux on the receiver side. This is
126  * an example of SSRC-multiplexing.
127  *
128  * It is important here to use a different starting sequence number
129  * (seqnum-offset), since this is the only means of identification that
130  * rtprtxreceive uses the very first time to identify retransmission streams.
131  * It is an error, according to RFC4588 to have two retransmission requests for
132  * packets belonging to two different streams but with the same sequence number.
133  * Note that the default seqnum-offset value (-1, which means random) would
134  * work just fine, but it is overriden here for illustration purposes.
135  */
136
137 #ifdef HAVE_CONFIG_H
138 #include "config.h"
139 #endif
140
141 #include <gst/gst.h>
142 #include <gst/rtp/gstrtpbuffer.h>
143 #include <string.h>
144 #include <stdlib.h>
145
146 #include "gstrtprtxreceive.h"
147
148 #define ASSOC_TIMEOUT (GST_SECOND)
149
150 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug);
151 #define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug
152
153 enum
154 {
155   PROP_0,
156   PROP_PAYLOAD_TYPE_MAP,
157   PROP_NUM_RTX_REQUESTS,
158   PROP_NUM_RTX_PACKETS,
159   PROP_NUM_RTX_ASSOC_PACKETS
160 };
161
162 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
163     GST_PAD_SRC,
164     GST_PAD_ALWAYS,
165     GST_STATIC_CAPS ("application/x-rtp")
166     );
167
168 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
169     GST_PAD_SINK,
170     GST_PAD_ALWAYS,
171     GST_STATIC_CAPS ("application/x-rtp")
172     );
173
174 static gboolean gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
175     GstEvent * event);
176 static GstFlowReturn gst_rtp_rtx_receive_chain (GstPad * pad,
177     GstObject * parent, GstBuffer * buffer);
178
179 static GstStateChangeReturn gst_rtp_rtx_receive_change_state (GstElement *
180     element, GstStateChange transition);
181
182 static void gst_rtp_rtx_receive_set_property (GObject * object, guint prop_id,
183     const GValue * value, GParamSpec * pspec);
184 static void gst_rtp_rtx_receive_get_property (GObject * object, guint prop_id,
185     GValue * value, GParamSpec * pspec);
186 static void gst_rtp_rtx_receive_finalize (GObject * object);
187
188 G_DEFINE_TYPE (GstRtpRtxReceive, gst_rtp_rtx_receive, GST_TYPE_ELEMENT);
189
190 static void
191 gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
192 {
193   GObjectClass *gobject_class;
194   GstElementClass *gstelement_class;
195
196   gobject_class = (GObjectClass *) klass;
197   gstelement_class = (GstElementClass *) klass;
198
199   gobject_class->get_property = gst_rtp_rtx_receive_get_property;
200   gobject_class->set_property = gst_rtp_rtx_receive_set_property;
201   gobject_class->finalize = gst_rtp_rtx_receive_finalize;
202
203   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
204       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
205           "Map of original payload types to their retransmission payload types",
206           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
207
208   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
209       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
210           "Number of retransmission events received", 0, G_MAXUINT,
211           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
212
213   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
214       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
215           " Number of retransmission packets received", 0, G_MAXUINT,
216           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
217
218   g_object_class_install_property (gobject_class, PROP_NUM_RTX_ASSOC_PACKETS,
219       g_param_spec_uint ("num-rtx-assoc-packets",
220           "Num RTX Associated Packets", "Number of retransmission packets "
221           "correctly associated with retransmission requests", 0, G_MAXUINT,
222           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
223
224   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
225   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
226
227   gst_element_class_set_static_metadata (gstelement_class,
228       "RTP Retransmission receiver", "Codec",
229       "Receive retransmitted RTP packets according to RFC4588",
230       "Julien Isorce <julien.isorce@collabora.co.uk>");
231
232   gstelement_class->change_state =
233       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_change_state);
234 }
235
236 static void
237 gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx)
238 {
239   GST_OBJECT_LOCK (rtx);
240   g_hash_table_remove_all (rtx->ssrc2_ssrc1_map);
241   g_hash_table_remove_all (rtx->seqnum_ssrc1_map);
242   rtx->num_rtx_requests = 0;
243   rtx->num_rtx_packets = 0;
244   rtx->num_rtx_assoc_packets = 0;
245   GST_OBJECT_UNLOCK (rtx);
246 }
247
248 static void
249 gst_rtp_rtx_receive_finalize (GObject * object)
250 {
251   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
252
253   g_hash_table_unref (rtx->ssrc2_ssrc1_map);
254   g_hash_table_unref (rtx->seqnum_ssrc1_map);
255   g_hash_table_unref (rtx->rtx_pt_map);
256   if (rtx->rtx_pt_map_structure)
257     gst_structure_free (rtx->rtx_pt_map_structure);
258
259   G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object);
260 }
261
262 typedef struct
263 {
264   guint32 ssrc;
265   GstClockTime time;
266 } SsrcAssoc;
267
268 static SsrcAssoc *
269 ssrc_assoc_new (guint32 ssrc, GstClockTime time)
270 {
271   SsrcAssoc *assoc = g_slice_new (SsrcAssoc);
272
273   assoc->ssrc = ssrc;
274   assoc->time = time;
275
276   return assoc;
277 }
278
279 static void
280 ssrc_assoc_free (SsrcAssoc * assoc)
281 {
282   g_slice_free (SsrcAssoc, assoc);
283 }
284
285 static void
286 gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
287 {
288   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
289
290   rtx->srcpad =
291       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
292           "src"), "src");
293   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
294   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
295   gst_pad_set_event_function (rtx->srcpad,
296       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_src_event));
297   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
298
299   rtx->sinkpad =
300       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
301           "sink"), "sink");
302   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
303   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
304   gst_pad_set_chain_function (rtx->sinkpad,
305       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_chain));
306   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
307
308   rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
309   rtx->seqnum_ssrc1_map = g_hash_table_new_full (g_direct_hash, g_direct_equal,
310       NULL, (GDestroyNotify) ssrc_assoc_free);
311
312   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
313 }
314
315 static gboolean
316 gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
317     GstEvent * event)
318 {
319   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
320   gboolean res;
321
322   switch (GST_EVENT_TYPE (event)) {
323     case GST_EVENT_CUSTOM_UPSTREAM:
324     {
325       const GstStructure *s = gst_event_get_structure (event);
326
327       /* This event usually comes from the downstream gstrtpjitterbuffer */
328       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
329         guint seqnum = 0;
330         guint ssrc = 0;
331         gpointer ssrc2 = 0;
332
333         /* retrieve seqnum of the packet that need to be retransmitted */
334         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
335           seqnum = -1;
336
337         /* retrieve ssrc of the packet that need to be retransmitted
338          * it's useful when reconstructing the original packet from the rtx packet */
339         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
340           ssrc = -1;
341
342         GST_DEBUG_OBJECT (rtx,
343             "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
344             seqnum, ssrc);
345
346         GST_OBJECT_LOCK (rtx);
347
348         /* increase number of seen requests for our statistics */
349         ++rtx->num_rtx_requests;
350
351         /* First, we lookup in our map to see if we have already associate this
352          * master stream ssrc with its retransmitted stream.
353          * Every ssrc are unique so we can use the same hash table
354          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
355          */
356         if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
357                 GUINT_TO_POINTER (ssrc), NULL, &ssrc2)
358             && GPOINTER_TO_UINT (ssrc2) != GPOINTER_TO_UINT (ssrc)) {
359           GST_DEBUG_OBJECT (rtx, "Retransmited stream %" G_GUINT32_FORMAT
360               " already associated to its master", GPOINTER_TO_UINT (ssrc2));
361         } else {
362           SsrcAssoc *assoc;
363
364           /* not already associated but also we have to check that we have not
365            * already considered this request.
366            */
367           if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
368                   GUINT_TO_POINTER (seqnum), NULL, (gpointer *) & assoc)) {
369             if (assoc->ssrc == ssrc) {
370               /* do nothing because we have already considered this request
371                * The jitter may be too impatient of the rtx packet has been
372                * lost too.
373                * It does not mean we reject the event, we still want to forward
374                * the request to the gstrtpsession to be translater into a FB NACK
375                */
376               GST_DEBUG_OBJECT (rtx, "Duplicated request seqnum: %"
377                   G_GUINT32_FORMAT ", ssrc1: %" G_GUINT32_FORMAT, seqnum, ssrc);
378             } else {
379
380               /* If the association attempt is larger than ASSOC_TIMEOUT,
381                * then we give up on it, and try this one.
382                */
383               if (!GST_CLOCK_TIME_IS_VALID (rtx->last_time) ||
384                   !GST_CLOCK_TIME_IS_VALID (assoc->time) ||
385                   assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
386                 /* From RFC 4588:
387                  * the receiver MUST NOT have two outstanding requests for the
388                  * same packet sequence number in two different original streams
389                  * before the association is resolved. Otherwise it's impossible
390                  * to associate a rtx stream and its master stream
391                  */
392
393                 /* remove seqnum in order to reuse the spot */
394                 g_hash_table_remove (rtx->seqnum_ssrc1_map,
395                     GUINT_TO_POINTER (seqnum));
396                 goto retransmit;
397               } else {
398                 GST_DEBUG_OBJECT (rtx,
399                     "reject request for seqnum %" G_GUINT32_FORMAT
400                     " of master stream %" G_GUINT32_FORMAT, seqnum, ssrc);
401
402                 /* do not forward the event as we are rejecting this request */
403                 GST_OBJECT_UNLOCK (rtx);
404                 gst_event_unref (event);
405                 return TRUE;
406               }
407             }
408           } else {
409           retransmit:
410             /* the request has not been already considered
411              * insert it for the first time */
412             g_hash_table_insert (rtx->seqnum_ssrc1_map,
413                 GUINT_TO_POINTER (seqnum),
414                 ssrc_assoc_new (ssrc, rtx->last_time));
415           }
416         }
417
418         GST_DEBUG_OBJECT (rtx,
419             "packet number %" G_GUINT32_FORMAT " of master stream %"
420             G_GUINT32_FORMAT " needs to be retransmitted", seqnum, ssrc);
421
422         GST_OBJECT_UNLOCK (rtx);
423       }
424
425       /* Transfer event upstream so that the request can acutally by translated
426        * through gstrtpsession through the network */
427       res = gst_pad_event_default (pad, parent, event);
428       break;
429     }
430     default:
431       res = gst_pad_event_default (pad, parent, event);
432       break;
433   }
434   return res;
435 }
436
437 /* Copy fixed header and extension. Replace current ssrc by ssrc1,
438  * remove OSN and replace current seq num by OSN.
439  * Copy memory to avoid to manually copy each rtp buffer field.
440  */
441 static GstBuffer *
442 _gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1,
443     guint16 orign_seqnum, guint8 origin_payload_type)
444 {
445   GstMemory *mem = NULL;
446   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
447   GstBuffer *new_buffer = gst_buffer_new ();
448   GstMapInfo map;
449   guint payload_len = 0;
450
451   /* copy fixed header */
452   mem = gst_memory_copy (rtp->map[0].memory,
453       (guint8 *) rtp->data[0] - rtp->map[0].data, rtp->size[0]);
454   gst_buffer_append_memory (new_buffer, mem);
455
456   /* copy extension if any */
457   if (rtp->size[1]) {
458     mem = gst_memory_copy (rtp->map[1].memory,
459         (guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]);
460     gst_buffer_append_memory (new_buffer, mem);
461   }
462
463   /* copy payload and remove OSN */
464   payload_len = rtp->size[2] - 2;
465   mem = gst_allocator_alloc (NULL, payload_len, NULL);
466
467   gst_memory_map (mem, &map, GST_MAP_WRITE);
468   if (rtp->size[2])
469     memcpy (map.data, (guint8 *) rtp->data[2] + 2, payload_len);
470   gst_memory_unmap (mem, &map);
471   gst_buffer_append_memory (new_buffer, mem);
472
473   /* the sender always constructs rtx packets without padding,
474    * But the receiver can still receive rtx packets with padding.
475    * So just copy it.
476    */
477   if (rtp->size[3]) {
478     guint pad_len = rtp->size[3];
479
480     mem = gst_allocator_alloc (NULL, pad_len, NULL);
481
482     gst_memory_map (mem, &map, GST_MAP_WRITE);
483     map.data[pad_len - 1] = pad_len;
484     gst_memory_unmap (mem, &map);
485
486     gst_buffer_append_memory (new_buffer, mem);
487   }
488
489   /* set ssrc and seq num */
490   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
491   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc1);
492   gst_rtp_buffer_set_seq (&new_rtp, orign_seqnum);
493   gst_rtp_buffer_set_payload_type (&new_rtp, origin_payload_type);
494   gst_rtp_buffer_unmap (&new_rtp);
495
496   gst_buffer_copy_into (new_buffer, rtp->buffer,
497       GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
498   GST_BUFFER_FLAG_SET (new_buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION);
499
500   return new_buffer;
501 }
502
503 static GstFlowReturn
504 gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
505 {
506   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
507   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
508   GstFlowReturn ret = GST_FLOW_OK;
509   GstBuffer *new_buffer = NULL;
510   guint32 ssrc = 0;
511   gpointer ssrc1 = 0;
512   guint32 ssrc2 = 0;
513   guint16 seqnum = 0;
514   guint16 orign_seqnum = 0;
515   guint8 payload_type = 0;
516   gpointer payload = NULL;
517   guint8 origin_payload_type = 0;
518   gboolean is_rtx;
519   gboolean drop = FALSE;
520
521   /* map current rtp packet to parse its header */
522   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
523     goto invalid_buffer;
524
525   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
526   seqnum = gst_rtp_buffer_get_seq (&rtp);
527   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
528
529   /* check if we have a retransmission packet (this information comes from SDP) */
530   GST_OBJECT_LOCK (rtx);
531
532   is_rtx =
533       g_hash_table_lookup_extended (rtx->rtx_pt_map,
534       GUINT_TO_POINTER (payload_type), NULL, NULL);
535
536   if (is_rtx) {
537     payload = gst_rtp_buffer_get_payload (&rtp);
538
539     if (!payload || gst_rtp_buffer_get_payload_len (&rtp) < 2) {
540       GST_OBJECT_UNLOCK (rtx);
541       gst_rtp_buffer_unmap (&rtp);
542       goto invalid_buffer;
543     }
544   }
545
546   rtx->last_time = GST_BUFFER_PTS (buffer);
547
548   if (g_hash_table_size (rtx->seqnum_ssrc1_map) > 0) {
549     GHashTableIter iter;
550     gpointer key, value;
551
552     g_hash_table_iter_init (&iter, rtx->seqnum_ssrc1_map);
553     while (g_hash_table_iter_next (&iter, &key, &value)) {
554       SsrcAssoc *assoc = value;
555
556       /* remove association request if it is too old */
557       if (GST_CLOCK_TIME_IS_VALID (rtx->last_time) &&
558           GST_CLOCK_TIME_IS_VALID (assoc->time) &&
559           assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
560         g_hash_table_iter_remove (&iter);
561       }
562     }
563   }
564
565   /* if the current packet is from a retransmission stream */
566   if (is_rtx) {
567     /* increase our statistic */
568     ++rtx->num_rtx_packets;
569
570     /* read OSN in the rtx payload */
571     orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp));
572     origin_payload_type =
573         GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
574             GUINT_TO_POINTER (payload_type)));
575
576     /* first we check if we already have associated this retransmission stream
577      * to a master stream */
578     if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
579             GUINT_TO_POINTER (ssrc), NULL, &ssrc1)) {
580       GST_DEBUG_OBJECT (rtx,
581           "packet is from retransmission stream %" G_GUINT32_FORMAT
582           " already associated to master stream %" G_GUINT32_FORMAT, ssrc,
583           GPOINTER_TO_UINT (ssrc1));
584       ssrc2 = ssrc;
585     } else {
586       SsrcAssoc *assoc;
587
588       /* the current retransmitted packet has its rtx stream not already
589        * associated to a master stream, so retrieve it from our request
590        * history */
591       if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
592               GUINT_TO_POINTER (orign_seqnum), NULL, (gpointer *) & assoc)) {
593         GST_DEBUG_OBJECT (rtx,
594             "associate retransmitted stream %" G_GUINT32_FORMAT
595             " to master stream %" G_GUINT32_FORMAT " thanks to packet %"
596             G_GUINT16_FORMAT "", ssrc, assoc->ssrc, orign_seqnum);
597         ssrc1 = GUINT_TO_POINTER (assoc->ssrc);
598         ssrc2 = ssrc;
599
600         /* just put a guard */
601         if (GPOINTER_TO_UINT (ssrc1) == ssrc2)
602           GST_WARNING_OBJECT (rtx, "RTX receiver ssrc2_ssrc1_map bad state, "
603               "ssrc %" G_GUINT32_FORMAT " are the same\n", ssrc);
604
605         /* free the spot so that this seqnum can be used to do another
606          * association */
607         g_hash_table_remove (rtx->seqnum_ssrc1_map,
608             GUINT_TO_POINTER (orign_seqnum));
609
610         /* actually do the association between rtx stream and master stream */
611         g_hash_table_insert (rtx->ssrc2_ssrc1_map, GUINT_TO_POINTER (ssrc2),
612             ssrc1);
613
614         /* also do the association between master stream and rtx stream
615          * every ssrc are unique so we can use the same hash table
616          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
617          */
618         g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1,
619             GUINT_TO_POINTER (ssrc2));
620
621       } else {
622         /* we are not able to associate this rtx packet with a master stream */
623         GST_DEBUG_OBJECT (rtx,
624             "drop rtx packet because its orign_seqnum %" G_GUINT16_FORMAT
625             " is not in pending retransmission requests", orign_seqnum);
626         drop = TRUE;
627       }
628     }
629   }
630
631   /* if not dropped the packet was successfully associated */
632   if (is_rtx && !drop)
633     ++rtx->num_rtx_assoc_packets;
634
635   GST_OBJECT_UNLOCK (rtx);
636
637   /* just drop the packet if the association could not have been made */
638   if (drop) {
639     gst_rtp_buffer_unmap (&rtp);
640     gst_buffer_unref (buffer);
641     return GST_FLOW_OK;
642   }
643
644   /* create the retransmission packet */
645   if (is_rtx)
646     new_buffer =
647         _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1),
648         orign_seqnum, origin_payload_type);
649
650   gst_rtp_buffer_unmap (&rtp);
651
652   /* push the packet */
653   if (is_rtx) {
654     gst_buffer_unref (buffer);
655     GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT
656         " from a restransmission stream ssrc2:%" G_GUINT32_FORMAT " (src %"
657         G_GUINT32_FORMAT ")", orign_seqnum, ssrc2, GPOINTER_TO_UINT (ssrc1));
658     ret = gst_pad_push (rtx->srcpad, new_buffer);
659   } else {
660     GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT
661         " from a master stream ssrc: %" G_GUINT32_FORMAT, seqnum, ssrc);
662     ret = gst_pad_push (rtx->srcpad, buffer);
663   }
664
665   return ret;
666
667 invalid_buffer:
668   {
669     GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL),
670         ("Received invalid RTP payload, dropping"));
671     gst_buffer_unref (buffer);
672     return GST_FLOW_OK;
673   }
674 }
675
676 static void
677 gst_rtp_rtx_receive_get_property (GObject * object,
678     guint prop_id, GValue * value, GParamSpec * pspec)
679 {
680   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
681
682   switch (prop_id) {
683     case PROP_PAYLOAD_TYPE_MAP:
684       GST_OBJECT_LOCK (rtx);
685       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
686       GST_OBJECT_UNLOCK (rtx);
687       break;
688     case PROP_NUM_RTX_REQUESTS:
689       GST_OBJECT_LOCK (rtx);
690       g_value_set_uint (value, rtx->num_rtx_requests);
691       GST_OBJECT_UNLOCK (rtx);
692       break;
693     case PROP_NUM_RTX_PACKETS:
694       GST_OBJECT_LOCK (rtx);
695       g_value_set_uint (value, rtx->num_rtx_packets);
696       GST_OBJECT_UNLOCK (rtx);
697       break;
698     case PROP_NUM_RTX_ASSOC_PACKETS:
699       GST_OBJECT_LOCK (rtx);
700       g_value_set_uint (value, rtx->num_rtx_assoc_packets);
701       GST_OBJECT_UNLOCK (rtx);
702       break;
703     default:
704       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
705       break;
706   }
707 }
708
709 static gboolean
710 structure_to_hash_table_inv (GQuark field_id, const GValue * value,
711     gpointer hash)
712 {
713   const gchar *field_str;
714   guint field_uint;
715   guint value_uint;
716
717   field_str = g_quark_to_string (field_id);
718   field_uint = atoi (field_str);
719   value_uint = g_value_get_uint (value);
720   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (value_uint),
721       GUINT_TO_POINTER (field_uint));
722
723   return TRUE;
724 }
725
726 static void
727 gst_rtp_rtx_receive_set_property (GObject * object,
728     guint prop_id, const GValue * value, GParamSpec * pspec)
729 {
730   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
731
732   switch (prop_id) {
733     case PROP_PAYLOAD_TYPE_MAP:
734       GST_OBJECT_LOCK (rtx);
735       if (rtx->rtx_pt_map_structure)
736         gst_structure_free (rtx->rtx_pt_map_structure);
737       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
738       g_hash_table_remove_all (rtx->rtx_pt_map);
739       gst_structure_foreach (rtx->rtx_pt_map_structure,
740           structure_to_hash_table_inv, rtx->rtx_pt_map);
741       GST_OBJECT_UNLOCK (rtx);
742       break;
743     default:
744       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
745       break;
746   }
747 }
748
749 static GstStateChangeReturn
750 gst_rtp_rtx_receive_change_state (GstElement * element,
751     GstStateChange transition)
752 {
753   GstStateChangeReturn ret;
754   GstRtpRtxReceive *rtx;
755
756   rtx = GST_RTP_RTX_RECEIVE (element);
757
758   switch (transition) {
759     default:
760       break;
761   }
762
763   ret =
764       GST_ELEMENT_CLASS (gst_rtp_rtx_receive_parent_class)->change_state
765       (element, transition);
766
767   switch (transition) {
768     case GST_STATE_CHANGE_PAUSED_TO_READY:
769       gst_rtp_rtx_receive_reset (rtx);
770       break;
771     default:
772       break;
773   }
774
775   return ret;
776 }
777
778 gboolean
779 gst_rtp_rtx_receive_plugin_init (GstPlugin * plugin)
780 {
781   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_receive_debug, "rtprtxreceive", 0,
782       "rtp retransmission receiver");
783
784   return gst_element_register (plugin, "rtprtxreceive", GST_RANK_NONE,
785       GST_TYPE_RTP_RTX_RECEIVE);
786 }