rtxreceive: Put debug output for retransmission requests at the right place
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtprtxreceive.c
1 /* RTP Retransmission receiver element for GStreamer
2  *
3  * gstrtprtxreceive.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-rtprtxreceive
26  * @see_also: rtprtxsend, rtpsession, rtpjitterbuffer
27  *
28  * The receiver will listen to the custom retransmission events from the
29  * downstream jitterbuffer and will remember the SSRC1 of the stream and
30  * seqnum that was requested. When it sees a packet with one of the stored
31  * seqnum, it associates the SSRC2 of the stream with the SSRC1 of the
32  * master stream. From then it knows that SSRC2 is the retransmission
33  * stream of SSRC1. This algorithm is stated in RFC 4588. For this
34  * algorithm to work, RFC4588 also states that no two pending retransmission
35  * requests can exist for the same seqnum and different SSRCs or else it
36  * would be impossible to associate the retransmission with the original
37  * requester SSRC.
38  * When the RTX receiver has associated the retransmission packets,
39  * it can depayload and forward them to the source pad of the element.
40  * RTX is SSRC-multiplexed. See #GstRtpRtxSend
41  *
42  * <refsect2>
43  * <title>Example pipelines</title>
44  * |[
45  * gst-launch-1.0 rtpsession name=rtpsession \
46  *         audiotestsrc ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 ! \
47  *             identity drop-probability=0.1 ! rtpsession.send_rtp_sink \
48  *             rtpsession.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \
49  *         udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
50  *         rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false
51  * ]| Send audio stream through port 5000. (5001 and 5002 are just the rtcp link with the receiver)
52  * |[
53  * gst-launch-1.0 rtpsession name=rtpsession \
54  *         udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)44100,encoding-name=(string)SPEEX,encoding-params=(string)1,octet-align=(string)1" ! \
55  *             rtpsession.recv_rtp_sink \
56  *             rtpsession.recv_rtp_src ! rtprtxreceive rtx-payload-types="99" ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! \
57  *             speexdec ! audioconvert ! autoaudiosink \
58  *         rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5001 \
59  *         udpsrc port=5002 ! rtpsession.recv_rtcp_sink sync=fakse async=false
60  * ]| Receive audio stream from port 5000. (5001 and 5002 are just the rtcp link with the sender)
61  * On sender side make sure to use a different payload type for the stream and
62  * its associated retransmission stream (see #GstRtpRtxSend). Note that several retransmission streams can
63  * have the same payload type so this is not deterministic. Actually the
64  * rtprtxreceiver element does the association using seqnum values.
65  * On receiver side set all the retransmission payload types (Those informations are retrieve
66  * through SDP).
67  * You should still hear a clear sound when setting drop-probability to something greater than 0.
68  * The rtpjitterbuffer will generate a custom upstream event GstRTPRetransmissionRequest when
69  * it assumes that one packet is missing. Then this request is translated to a FB NACK in the rtcp link
70  * Finally the rtpsession of the sender side re-convert it in a GstRTPRetransmissionRequest that will
71  * be handle by rtprtxsend.
72  * When increasing this value it may be possible that even the retransmission stream would be dropped
73  * so the receiver will ask to resend the packets again and again until it actually receive them.
74  * If the value is too high the rtprtxsend will not be able to retrieve the packet in its list of
75  * stored packets. For learning purpose you could try to increase the max-size-packets or max-size-time
76  * rtprtxsender's properties.
77  * Also note that you should use rtprtxsend through rtpbin and its set-aux-send property. See #GstRtpBin.
78  * |[
79  * gst-launch-1.0 rtpsession name=rtpsession0 \
80  *         audiotestsrc wave=0 ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 seqnum-offset=1 ! \
81  *             identity drop-probability=0.1 ! rtpsession0.send_rtp_sink \
82  *             rtpsession0.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \
83  *         udpsrc port=5001 ! rtpsession0.recv_rtcp_sink \
84  *         rtpsession0.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false \
85  *                rtpsession name=rtpsession1 \
86  *         audiotestsrc wave=0 ! speexenc ! rtpspeexpay pt=97 ! rtprtxsend rtx-payload-type=99 seqnum-offset=10 ! \
87  *             identity drop-probability=0.1 ! rtpsession1.send_rtp_sink \
88  *             rtpsession1.send_rtp_src ! udpsink host="127.0.0.1" port=5000 \
89  *         udpsrc port=5004 ! rtpsession1.recv_rtcp_sink \
90  *         rtpsession1.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 sync=false async=false
91  * ]| Send two audio streams to port 5000.
92  * |[
93  * gst-launch-1.0 rtpsession name=rtpsession
94  *         udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)44100,encoding-name=(string)SPEEX,encoding-params=(string)1,octet-align=(string)1" ! \
95  *             rtpsession.recv_rtp_sink \
96  *             rtpsession.recv_rtp_src ! rtprtxreceive rtx-payload-types="99" ! rtpssrcdemux name=demux \
97  *             demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! speexdec ! audioconvert ! autoaudiosink \
98  *             demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpspeexdepay ! speexdec ! audioconvert ! autoaudiosink \
99  *         rtpsession.send_rtcp_src ! ! tee name=t ! queue ! udpsink host="127.0.0.1" port=5001 t. ! queue ! udpsink host="127.0.0.1" port=5004 \
100  *         udpsrc port=5002 ! rtpsession.recv_rtcp_sink sync=fakse async=false
101  * ]| Receive audio stream from port 5000.
102  * On sender side the two streams have the same payload type for master streams, Same about retransmission streams.
103  * The streams are sent to the network through two distincts sessions.
104  * But we need to set a different seqnum-offset to make sure their seqnum navigate at a different rate like in concrete cases.
105  * We could also choose the same seqnum offset but we would require to set a different initial seqnum value.
106  * This is also why the rtprtxreceive can succeed to do the association between master and retransmission stream.
107  * On receiver side the same session is used to receive the two streams. So the rtpssrcdemux is here to demultiplex
108  * those two streams. The rtprtxreceive is responsible for reconstructing the original packets from the two retransmission streams.
109  * You can play with the drop-probability value for one or both streams.
110  * You should hear a clear sound. (after a few seconds the two streams wave feel synchronized)
111  * </refsect2>
112  */
113
114 #ifdef HAVE_CONFIG_H
115 #include "config.h"
116 #endif
117
118 #include <gst/gst.h>
119 #include <gst/rtp/gstrtpbuffer.h>
120 #include <string.h>
121 #include <stdlib.h>
122
123 #include "gstrtprtxreceive.h"
124
125 #define ASSOC_TIMEOUT (GST_SECOND)
126
127 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug);
128 #define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug
129
130 enum
131 {
132   PROP_0,
133   PROP_PAYLOAD_TYPE_MAP,
134   PROP_NUM_RTX_REQUESTS,
135   PROP_NUM_RTX_PACKETS,
136   PROP_NUM_RTX_ASSOC_PACKETS,
137   PROP_LAST
138 };
139
140 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
141     GST_PAD_SRC,
142     GST_PAD_ALWAYS,
143     GST_STATIC_CAPS ("application/x-rtp")
144     );
145
146 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
147     GST_PAD_SINK,
148     GST_PAD_ALWAYS,
149     GST_STATIC_CAPS ("application/x-rtp")
150     );
151
152 static gboolean gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
153     GstEvent * event);
154 static GstFlowReturn gst_rtp_rtx_receive_chain (GstPad * pad,
155     GstObject * parent, GstBuffer * buffer);
156
157 static GstStateChangeReturn gst_rtp_rtx_receive_change_state (GstElement *
158     element, GstStateChange transition);
159
160 static void gst_rtp_rtx_receive_set_property (GObject * object, guint prop_id,
161     const GValue * value, GParamSpec * pspec);
162 static void gst_rtp_rtx_receive_get_property (GObject * object, guint prop_id,
163     GValue * value, GParamSpec * pspec);
164 static void gst_rtp_rtx_receive_finalize (GObject * object);
165
166 G_DEFINE_TYPE (GstRtpRtxReceive, gst_rtp_rtx_receive, GST_TYPE_ELEMENT);
167
168 static void
169 gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
170 {
171   GObjectClass *gobject_class;
172   GstElementClass *gstelement_class;
173
174   gobject_class = (GObjectClass *) klass;
175   gstelement_class = (GstElementClass *) klass;
176
177   gobject_class->get_property = gst_rtp_rtx_receive_get_property;
178   gobject_class->set_property = gst_rtp_rtx_receive_set_property;
179   gobject_class->finalize = gst_rtp_rtx_receive_finalize;
180
181   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
182       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
183           "Map of original payload types to their retransmission payload types",
184           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
187       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
188           "Number of retransmission events received", 0, G_MAXUINT,
189           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
190
191   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
192       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
193           " Number of retransmission packets received", 0, G_MAXUINT,
194           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
195
196   g_object_class_install_property (gobject_class, PROP_NUM_RTX_ASSOC_PACKETS,
197       g_param_spec_uint ("num-rtx-assoc-packets",
198           "Num RTX Associated Packets", "Number of retransmission packets "
199           "correctly associated with retransmission requests", 0, G_MAXUINT,
200           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
201
202   gst_element_class_add_pad_template (gstelement_class,
203       gst_static_pad_template_get (&src_factory));
204   gst_element_class_add_pad_template (gstelement_class,
205       gst_static_pad_template_get (&sink_factory));
206
207   gst_element_class_set_static_metadata (gstelement_class,
208       "RTP Retransmission receiver", "Codec",
209       "Receive retransmitted RTP packets according to RFC4588",
210       "Julien Isorce <julien.isorce@collabora.co.uk>");
211
212   gstelement_class->change_state =
213       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_change_state);
214 }
215
216 static void
217 gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx)
218 {
219   GST_OBJECT_LOCK (rtx);
220   g_hash_table_remove_all (rtx->ssrc2_ssrc1_map);
221   g_hash_table_remove_all (rtx->seqnum_ssrc1_map);
222   rtx->num_rtx_requests = 0;
223   rtx->num_rtx_packets = 0;
224   rtx->num_rtx_assoc_packets = 0;
225   GST_OBJECT_UNLOCK (rtx);
226 }
227
228 static void
229 gst_rtp_rtx_receive_finalize (GObject * object)
230 {
231   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
232
233   g_hash_table_unref (rtx->ssrc2_ssrc1_map);
234   g_hash_table_unref (rtx->seqnum_ssrc1_map);
235   g_hash_table_unref (rtx->rtx_pt_map);
236   if (rtx->rtx_pt_map_structure)
237     gst_structure_free (rtx->rtx_pt_map_structure);
238
239   G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object);
240 }
241
242 typedef struct
243 {
244   guint32 ssrc;
245   GstClockTime time;
246 } SsrcAssoc;
247
248 static SsrcAssoc *
249 ssrc_assoc_new (guint32 ssrc, GstClockTime time)
250 {
251   SsrcAssoc *assoc = g_slice_new (SsrcAssoc);
252
253   assoc->ssrc = ssrc;
254   assoc->time = time;
255
256   return assoc;
257 }
258
259 static void
260 ssrc_assoc_free (SsrcAssoc * assoc)
261 {
262   g_slice_free (SsrcAssoc, assoc);
263 }
264
265 static void
266 gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
267 {
268   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
269
270   rtx->srcpad =
271       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
272           "src"), "src");
273   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
274   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
275   gst_pad_set_event_function (rtx->srcpad,
276       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_src_event));
277   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
278
279   rtx->sinkpad =
280       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
281           "sink"), "sink");
282   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
283   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
284   gst_pad_set_chain_function (rtx->sinkpad,
285       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_chain));
286   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
287
288   rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
289   rtx->seqnum_ssrc1_map = g_hash_table_new_full (g_direct_hash, g_direct_equal,
290       NULL, (GDestroyNotify) ssrc_assoc_free);
291
292   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
293 }
294
295 static gboolean
296 gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
297     GstEvent * event)
298 {
299   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
300   gboolean res;
301
302   switch (GST_EVENT_TYPE (event)) {
303     case GST_EVENT_CUSTOM_UPSTREAM:
304     {
305       const GstStructure *s = gst_event_get_structure (event);
306
307       /* This event usually comes from the downstream gstrtpjitterbuffer */
308       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
309         guint seqnum = 0;
310         guint ssrc = 0;
311         gpointer ssrc2 = 0;
312
313         /* retrieve seqnum of the packet that need to be retransmitted */
314         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
315           seqnum = -1;
316
317         /* retrieve ssrc of the packet that need to be retransmitted
318          * it's usefull when reconstructing the original packet from the rtx packet */
319         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
320           ssrc = -1;
321
322         GST_DEBUG_OBJECT (rtx,
323             "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
324             seqnum, ssrc);
325
326         GST_OBJECT_LOCK (rtx);
327
328         /* increase number of seen requests for our statistics */
329         ++rtx->num_rtx_requests;
330
331         /* First, we lookup in our map to see if we have already associate this
332          * master stream ssrc with its retransmitted stream.
333          * Every ssrc are unique so we can use the same hash table
334          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
335          */
336         if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
337                 GUINT_TO_POINTER (ssrc), NULL, &ssrc2)
338             && GPOINTER_TO_UINT (ssrc2) != GPOINTER_TO_UINT (ssrc)) {
339           GST_DEBUG_OBJECT (rtx, "Retransmited stream %" G_GUINT32_FORMAT
340               " already associated to its master", GPOINTER_TO_UINT (ssrc2));
341         } else {
342           SsrcAssoc *assoc;
343
344           /* not already associated but also we have to check that we have not
345            * already considered this request.
346            */
347           if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
348                   GUINT_TO_POINTER (seqnum), NULL, (gpointer *) & assoc)) {
349             if (assoc->ssrc == ssrc) {
350               /* do nothing because we have already considered this request
351                * The jitter may be too impatient of the rtx packet has been
352                * lost too.
353                * It does not mean we reject the event, we still want to forward
354                * the request to the gstrtpsession to be translater into a FB NACK
355                */
356               GST_DEBUG_OBJECT (rtx, "Duplicated request seqnum: %"
357                   G_GUINT32_FORMAT ", ssrc1: %" G_GUINT32_FORMAT, seqnum, ssrc);
358             } else {
359
360               /* If the association attempt is larger than ASSOC_TIMEOUT,
361                * then we give up on it, and try this one.
362                */
363               if (!GST_CLOCK_TIME_IS_VALID (rtx->last_time) ||
364                   !GST_CLOCK_TIME_IS_VALID (assoc->time) ||
365                   assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
366                 /* From RFC 4588:
367                  * the receiver MUST NOT have two outstanding requests for the
368                  * same packet sequence number in two different original streams
369                  * before the association is resolved. Otherwise it's impossible
370                  * to associate a rtx stream and its master stream
371                  */
372
373                 /* remove seqnum in order to reuse the spot */
374                 g_hash_table_remove (rtx->seqnum_ssrc1_map,
375                     GUINT_TO_POINTER (seqnum));
376                 goto retransmit;
377               } else {
378                 GST_DEBUG_OBJECT (rtx,
379                     "reject request for seqnum %" G_GUINT32_FORMAT
380                     " of master stream %" G_GUINT32_FORMAT, seqnum, ssrc);
381
382                 /* do not forward the event as we are rejecting this request */
383                 GST_OBJECT_UNLOCK (rtx);
384                 gst_event_unref (event);
385                 return TRUE;
386               }
387             }
388           } else {
389           retransmit:
390             /* the request has not been already considered
391              * insert it for the first time */
392             g_hash_table_insert (rtx->seqnum_ssrc1_map,
393                 GUINT_TO_POINTER (seqnum),
394                 ssrc_assoc_new (ssrc, rtx->last_time));
395           }
396         }
397
398         GST_DEBUG_OBJECT (rtx,
399             "packet number %" G_GUINT32_FORMAT " of master stream %"
400             G_GUINT32_FORMAT " needs to be retransmitted", seqnum, ssrc);
401
402         GST_OBJECT_UNLOCK (rtx);
403       }
404
405       /* Transfer event upstream so that the request can acutally by translated
406        * through gstrtpsession through the network */
407       res = gst_pad_event_default (pad, parent, event);
408       break;
409     }
410     default:
411       res = gst_pad_event_default (pad, parent, event);
412       break;
413   }
414   return res;
415 }
416
417 /* Copy fixed header and extension. Replace current ssrc by ssrc1,
418  * remove OSN and replace current seq num by OSN.
419  * Copy memory to avoid to manually copy each rtp buffer field.
420  */
421 static GstBuffer *
422 _gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1,
423     guint16 orign_seqnum, guint8 origin_payload_type)
424 {
425   GstMemory *mem = NULL;
426   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
427   GstBuffer *new_buffer = gst_buffer_new ();
428   GstMapInfo map;
429   guint payload_len = 0;
430
431   /* copy fixed header */
432   mem = gst_memory_copy (rtp->map[0].memory,
433       (guint8 *) rtp->data[0] - rtp->map[0].data, rtp->size[0]);
434   gst_buffer_append_memory (new_buffer, mem);
435
436   /* copy extension if any */
437   if (rtp->size[1]) {
438     mem = gst_memory_copy (rtp->map[1].memory,
439         (guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]);
440     gst_buffer_append_memory (new_buffer, mem);
441   }
442
443   /* copy payload and remove OSN */
444   payload_len = rtp->size[2] - 2;
445   mem = gst_allocator_alloc (NULL, payload_len, NULL);
446
447   gst_memory_map (mem, &map, GST_MAP_WRITE);
448   if (rtp->size[2])
449     memcpy (map.data, (guint8 *) rtp->data[2] + 2, payload_len);
450   gst_memory_unmap (mem, &map);
451   gst_buffer_append_memory (new_buffer, mem);
452
453   /* the sender always constructs rtx packets without padding,
454    * But the receiver can still receive rtx packets with padding.
455    * So just copy it.
456    */
457   if (rtp->size[3]) {
458     guint pad_len = rtp->size[3];
459
460     mem = gst_allocator_alloc (NULL, pad_len, NULL);
461
462     gst_memory_map (mem, &map, GST_MAP_WRITE);
463     map.data[pad_len - 1] = pad_len;
464     gst_memory_unmap (mem, &map);
465
466     gst_buffer_append_memory (new_buffer, mem);
467   }
468
469   /* set ssrc and seq num */
470   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
471   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc1);
472   gst_rtp_buffer_set_seq (&new_rtp, orign_seqnum);
473   gst_rtp_buffer_set_payload_type (&new_rtp, origin_payload_type);
474   gst_rtp_buffer_unmap (&new_rtp);
475
476   gst_buffer_copy_into (new_buffer, rtp->buffer,
477       GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
478
479   return new_buffer;
480 }
481
482 static GstFlowReturn
483 gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
484 {
485   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
486   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
487   GstFlowReturn ret = GST_FLOW_OK;
488   GstBuffer *new_buffer = NULL;
489   guint32 ssrc = 0;
490   gpointer ssrc1 = 0;
491   guint32 ssrc2 = 0;
492   guint16 seqnum = 0;
493   guint16 orign_seqnum = 0;
494   guint8 payload_type = 0;
495   guint8 origin_payload_type = 0;
496   gboolean is_rtx;
497   gboolean drop = FALSE;
498
499   /* map current rtp packet to parse its header */
500   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
501   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
502   seqnum = gst_rtp_buffer_get_seq (&rtp);
503   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
504
505   /* check if we have a retransmission packet (this information comes from SDP) */
506   GST_OBJECT_LOCK (rtx);
507
508   rtx->last_time = GST_BUFFER_PTS (buffer);
509
510   is_rtx =
511       g_hash_table_lookup_extended (rtx->rtx_pt_map,
512       GUINT_TO_POINTER (payload_type), NULL, NULL);
513
514   /* if the current packet is from a retransmission stream */
515   if (is_rtx) {
516     /* increase our statistic */
517     ++rtx->num_rtx_packets;
518
519     /* read OSN in the rtx payload */
520     orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp));
521     origin_payload_type =
522         GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
523             GUINT_TO_POINTER (payload_type)));
524
525     /* first we check if we already have associated this retransmission stream
526      * to a master stream */
527     if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
528             GUINT_TO_POINTER (ssrc), NULL, &ssrc1)) {
529       GST_DEBUG_OBJECT (rtx,
530           "packet is from retransmission stream %" G_GUINT32_FORMAT
531           " already associated to master stream %" G_GUINT32_FORMAT, ssrc,
532           GPOINTER_TO_UINT (ssrc1));
533       ssrc2 = ssrc;
534     } else {
535       SsrcAssoc *assoc;
536
537       /* the current retransmitted packet has its rtx stream not already
538        * associated to a master stream, so retrieve it from our request
539        * history */
540       if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
541               GUINT_TO_POINTER (orign_seqnum), NULL, (gpointer *) & assoc)) {
542         GST_DEBUG_OBJECT (rtx,
543             "associate retransmitted stream %" G_GUINT32_FORMAT
544             " to master stream %" G_GUINT32_FORMAT " thanks to packet %"
545             G_GUINT16_FORMAT "", ssrc, assoc->ssrc, orign_seqnum);
546         ssrc1 = GUINT_TO_POINTER (assoc->ssrc);
547         ssrc2 = ssrc;
548
549         /* just put a guard */
550         if (GPOINTER_TO_UINT (ssrc1) == ssrc2)
551           GST_WARNING_OBJECT (rtx, "RTX receiver ssrc2_ssrc1_map bad state, "
552               "ssrc %" G_GUINT32_FORMAT " are the same\n", ssrc);
553
554         /* free the spot so that this seqnum can be used to do another
555          * association */
556         g_hash_table_remove (rtx->seqnum_ssrc1_map,
557             GUINT_TO_POINTER (orign_seqnum));
558
559         /* actually do the association between rtx stream and master stream */
560         g_hash_table_insert (rtx->ssrc2_ssrc1_map, GUINT_TO_POINTER (ssrc2),
561             ssrc1);
562
563         /* also do the association between master stream and rtx stream
564          * every ssrc are unique so we can use the same hash table
565          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
566          */
567         g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1,
568             GUINT_TO_POINTER (ssrc2));
569
570       } else {
571         /* we are not able to associate this rtx packet with a master stream */
572         GST_DEBUG_OBJECT (rtx,
573             "drop rtx packet because its orign_seqnum %" G_GUINT16_FORMAT
574             " is not in pending retransmission requests", orign_seqnum);
575         drop = TRUE;
576       }
577     }
578   }
579
580   /* if not dropped the packet was successfully associated */
581   if (is_rtx && !drop)
582     ++rtx->num_rtx_assoc_packets;
583
584   GST_OBJECT_UNLOCK (rtx);
585
586   /* just drop the packet if the association could not have been made */
587   if (drop) {
588     gst_rtp_buffer_unmap (&rtp);
589     gst_buffer_unref (buffer);
590     return GST_FLOW_OK;
591   }
592
593   /* create the retransmission packet */
594   if (is_rtx)
595     new_buffer =
596         _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1),
597         orign_seqnum, origin_payload_type);
598
599   gst_rtp_buffer_unmap (&rtp);
600
601   /* push the packet */
602   if (is_rtx) {
603     gst_buffer_unref (buffer);
604     GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT
605         " from a restransmission stream ssrc2:%" G_GUINT32_FORMAT " (src %"
606         G_GUINT32_FORMAT ")", orign_seqnum, ssrc2, GPOINTER_TO_UINT (ssrc1));
607     ret = gst_pad_push (rtx->srcpad, new_buffer);
608   } else {
609     GST_LOG_OBJECT (rtx, "push packet seqnum:%" G_GUINT16_FORMAT
610         " from a master stream ssrc: %" G_GUINT32_FORMAT, seqnum, ssrc);
611     ret = gst_pad_push (rtx->srcpad, buffer);
612   }
613
614   return ret;
615 }
616
617 static void
618 gst_rtp_rtx_receive_get_property (GObject * object,
619     guint prop_id, GValue * value, GParamSpec * pspec)
620 {
621   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
622
623   switch (prop_id) {
624     case PROP_PAYLOAD_TYPE_MAP:
625       GST_OBJECT_LOCK (rtx);
626       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
627       GST_OBJECT_UNLOCK (rtx);
628       break;
629     case PROP_NUM_RTX_REQUESTS:
630       GST_OBJECT_LOCK (rtx);
631       g_value_set_uint (value, rtx->num_rtx_requests);
632       GST_OBJECT_UNLOCK (rtx);
633       break;
634     case PROP_NUM_RTX_PACKETS:
635       GST_OBJECT_LOCK (rtx);
636       g_value_set_uint (value, rtx->num_rtx_packets);
637       GST_OBJECT_UNLOCK (rtx);
638       break;
639     case PROP_NUM_RTX_ASSOC_PACKETS:
640       GST_OBJECT_LOCK (rtx);
641       g_value_set_uint (value, rtx->num_rtx_assoc_packets);
642       GST_OBJECT_UNLOCK (rtx);
643       break;
644     default:
645       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
646       break;
647   }
648 }
649
650 static gboolean
651 structure_to_hash_table_inv (GQuark field_id, const GValue * value,
652     gpointer hash)
653 {
654   const gchar *field_str;
655   guint field_uint;
656   guint value_uint;
657
658   field_str = g_quark_to_string (field_id);
659   field_uint = atoi (field_str);
660   value_uint = g_value_get_uint (value);
661   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (value_uint),
662       GUINT_TO_POINTER (field_uint));
663
664   return TRUE;
665 }
666
667 static void
668 gst_rtp_rtx_receive_set_property (GObject * object,
669     guint prop_id, const GValue * value, GParamSpec * pspec)
670 {
671   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);
672
673   switch (prop_id) {
674     case PROP_PAYLOAD_TYPE_MAP:
675       GST_OBJECT_LOCK (rtx);
676       if (rtx->rtx_pt_map_structure)
677         gst_structure_free (rtx->rtx_pt_map_structure);
678       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
679       g_hash_table_remove_all (rtx->rtx_pt_map);
680       gst_structure_foreach (rtx->rtx_pt_map_structure,
681           structure_to_hash_table_inv, rtx->rtx_pt_map);
682       GST_OBJECT_UNLOCK (rtx);
683       break;
684     default:
685       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
686       break;
687   }
688 }
689
690 static GstStateChangeReturn
691 gst_rtp_rtx_receive_change_state (GstElement * element,
692     GstStateChange transition)
693 {
694   GstStateChangeReturn ret;
695   GstRtpRtxReceive *rtx;
696
697   rtx = GST_RTP_RTX_RECEIVE (element);
698
699   switch (transition) {
700     default:
701       break;
702   }
703
704   ret =
705       GST_ELEMENT_CLASS (gst_rtp_rtx_receive_parent_class)->change_state
706       (element, transition);
707
708   switch (transition) {
709     case GST_STATE_CHANGE_PAUSED_TO_READY:
710       gst_rtp_rtx_receive_reset (rtx);
711       break;
712     default:
713       break;
714   }
715
716   return ret;
717 }
718
719 gboolean
720 gst_rtp_rtx_receive_plugin_init (GstPlugin * plugin)
721 {
722   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_receive_debug, "rtprtxreceive", 0,
723       "rtp retransmission receiver");
724
725   return gst_element_register (plugin, "rtprtxreceive", GST_RANK_NONE,
726       GST_TYPE_RTP_RTX_RECEIVE);
727 }