rtpac3depay: should output audio/x-ac3 not audio/ac3
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtp / gstrtpulpfecenc.c
1 /* GStreamer plugin for forward error correction
2  * Copyright (C) 2017 Pexip
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * Author: Mikhail Fludkov <misha@pexip.com>
19  */
20
21 /**
22  * SECTION:element-rtpulpfecenc
23  * @short_description: Generic RTP Forward Error Correction (FEC) encoder
24  * @title: rtpulpfecenc
25  *
26  * Generic Forward Error Correction (FEC) encoder using Uneven Level
27  * Protection (ULP) as described in RFC 5109.
28  *
29  * It differs from the RFC in one important way, it multiplexes the
30  * FEC packets in the same sequence number as media packets. This is to be
31  * compatible with libwebrtc as using in Google Chrome and with Microsoft
32  * Lync / Skype for Business.
33  *
34  * Be warned that after using this element, it is no longer possible to know if
35  * there is a gap in the media stream based on the sequence numbers as the FEC
36  * packets become interleaved with the media packets.
37  *
38  * This element will insert protection packets in any RTP stream, which
39  * can then be used on the receiving side to recover lost packets.
40  *
41  * This element rewrites packets' seqnums, which means that when combined
42  * with retransmission elements such as #GstRtpRtxSend, it *must* be
43  * placed upstream of those, otherwise retransmission requests will request
44  * incorrect seqnums.
45  *
46  * A payload type for the protection packets *must* be specified, different
47  * from the payload type of the protected packets, with the GstRtpUlpFecEnc:pt
48  * property.
49  *
50  * The marker bit of RTP packets is used to determine sets of packets to
51  * protect as a unit, in order to modulate the level of protection, this
52  * behaviour can be disabled with GstRtpUlpFecEnc:multipacket, but should
53  * be left enabled for video streams.
54  *
55  * The level of protection can be configured with two properties,
56  * #GstRtpUlpFecEnc:percentage and #GstRtpUlpFecEnc:percentage-important,
57  * the element will determine which percentage to use for a given set of
58  * packets based on the presence of the #GST_BUFFER_FLAG_NON_DROPPABLE
59  * flag, upstream payloaders are expected to set this flag on "important"
60  * packets such as those making up a keyframe.
61  *
62  * The percentage is expressed not in terms of bytes, but in terms of
63  * packets, this for implementation convenience. The drawback with this
64  * approach is that when using a percentage different from 100 %, and a
65  * low bitrate, entire frames may be contained in a single packet, leading
66  * to some packets not being protected, thus lowering the overall recovery
67  * rate on the receiving side.
68  *
69  * When using #GstRtpBin, this element should be inserted through the
70  * #GstRtpBin::request-fec-encoder signal.
71  *
72  * ## Example pipeline
73  *
74  * |[
75  * gst-launch-1.0 videotestsrc ! x264enc ! video/x-h264, profile=baseline ! rtph264pay pt=96 ! rtpulpfecenc percentage=100 pt=122 ! udpsink port=8888
76  * ]| This example will receive a stream with FEC and try to reconstruct the packets.
77  *
78  * Example programs are available at
79  * <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/blob/master/examples/src/bin/rtpfecserver.rs>
80  * and
81  * <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/blob/master/examples/src/bin/rtpfecclient.rs>
82  *
83  * See also: #GstRtpUlpFecDec, #GstRtpBin
84  * Since: 1.14
85  */
86
87 #include <gst/rtp/gstrtp-enumtypes.h>
88 #include <gst/rtp/gstrtpbuffer.h>
89 #include <string.h>
90
91 #include "gstrtpelements.h"
92 #include "rtpulpfeccommon.h"
93 #include "gstrtpulpfecenc.h"
94
95 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
96     GST_PAD_SINK,
97     GST_PAD_ALWAYS,
98     GST_STATIC_CAPS ("application/x-rtp"));
99
100 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
101     GST_PAD_SRC,
102     GST_PAD_ALWAYS,
103     GST_STATIC_CAPS ("application/x-rtp"));
104
105 #define UNDEF_PT                255
106
107 #define DEFAULT_PT              UNDEF_PT
108 #define DEFAULT_PCT             0
109 #define DEFAULT_PCT_IMPORTANT   0
110 #define DEFAULT_MULTIPACKET     TRUE
111
112 #define PACKETS_BUF_MAX_LENGTH  (RTP_ULPFEC_PROTECTED_PACKETS_MAX(TRUE))
113
114 GST_DEBUG_CATEGORY (gst_rtp_ulpfec_enc_debug);
115 #define GST_CAT_DEFAULT (gst_rtp_ulpfec_enc_debug)
116
117 G_DEFINE_TYPE (GstRtpUlpFecEnc, gst_rtp_ulpfec_enc, GST_TYPE_ELEMENT);
118 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtpulpfecenc, "rtpulpfecenc",
119     GST_RANK_NONE, GST_TYPE_RTP_ULPFEC_ENC, rtp_element_init (plugin));
120
121 enum
122 {
123   PROP_0,
124   PROP_PT,
125   PROP_MULTIPACKET,
126   PROP_PROTECTED,
127   PROP_PERCENTAGE,
128   PROP_PERCENTAGE_IMPORTANT,
129 };
130
131 #define RTP_FEC_MAP_INFO_NTH(ctx, data) (&g_array_index (\
132     ((GstRtpUlpFecEncStreamCtx *)ctx)->info_arr, \
133     RtpUlpFecMapInfo, \
134     GPOINTER_TO_UINT(data)))
135
136 static void
137 dump_stream_ctx_settings (GstRtpUlpFecEncStreamCtx * ctx)
138 {
139   GST_DEBUG_OBJECT (ctx->parent, "rtpulpfec settings for ssrc 0x%x, pt %u, "
140       "percentage %u, percentage important %u, multipacket %u, mux_seq %u",
141       ctx->ssrc, ctx->pt, ctx->percentage, ctx->percentage_important,
142       ctx->multipacket, ctx->mux_seq);
143 };
144
145 static void
146 gst_rtp_ulpfec_enc_stream_ctx_start (GstRtpUlpFecEncStreamCtx * ctx,
147     GQueue * packets, guint fec_packets)
148 {
149   GList *it = packets->tail;
150   guint i;
151
152   g_array_set_size (ctx->info_arr, packets->length);
153
154   for (i = 0; i < packets->length; ++i) {
155     GstBuffer *buffer = it->data;
156     RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (ctx, i);
157
158     if (!rtp_ulpfec_map_info_map (gst_buffer_ref (buffer), info))
159       g_assert_not_reached ();
160
161     GST_LOG_RTP_PACKET (ctx->parent, "rtp header (incoming)", &info->rtp);
162
163     it = g_list_previous (it);
164   }
165
166   ctx->fec_packets = fec_packets;
167   ctx->fec_packet_idx = 0;
168 }
169
170 static void
171 gst_rtp_ulpfec_enc_stream_ctx_stop (GstRtpUlpFecEncStreamCtx * ctx)
172 {
173   g_array_set_size (ctx->info_arr, 0);
174   g_array_set_size (ctx->scratch_buf, 0);
175
176   ctx->fec_packets = 0;
177   ctx->fec_packet_idx = 0;
178 }
179
180 static void
181     gst_rtp_ulpfec_enc_stream_ctx_get_protection_parameters
182     (GstRtpUlpFecEncStreamCtx * ctx, guint16 * dst_seq_base, guint64 * dst_mask,
183     guint * dst_start, guint * dst_end)
184 {
185   guint media_packets = ctx->info_arr->len;
186   guint start = ctx->fec_packet_idx * media_packets / ctx->fec_packets;
187   guint end =
188       ((ctx->fec_packet_idx + 1) * media_packets + ctx->fec_packets -
189       1) / ctx->fec_packets - 1;
190   guint len = end - start + 1;
191   guint64 mask = 0;
192   guint16 seq_base = 0;
193   guint i;
194
195   len = MIN (len, RTP_ULPFEC_PROTECTED_PACKETS_MAX (TRUE));
196   end = start + len - 1;
197
198   for (i = start; i <= end; ++i) {
199     RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (ctx, i);
200     guint16 seq = gst_rtp_buffer_get_seq (&info->rtp);
201
202     if (mask) {
203       gint diff = gst_rtp_buffer_compare_seqnum (seq_base, seq);
204       if (diff < 0) {
205         seq_base = seq;
206         mask = mask >> (-diff);
207       }
208       mask |= rtp_ulpfec_packet_mask_from_seqnum (seq, seq_base, TRUE);
209     } else {
210       seq_base = seq;
211       mask = rtp_ulpfec_packet_mask_from_seqnum (seq, seq_base, TRUE);
212     }
213   }
214
215   *dst_start = start;
216   *dst_end = end;
217   *dst_mask = mask;
218   *dst_seq_base = seq_base;
219 }
220
221 static GstBuffer *
222 gst_rtp_ulpfec_enc_stream_ctx_protect (GstRtpUlpFecEncStreamCtx * ctx,
223     guint8 pt, guint16 seq, guint32 timestamp, guint32 ssrc)
224 {
225   guint end = 0;
226   guint start = 0;
227   guint64 fec_mask = 0;
228   guint16 seq_base = 0;
229   GstBuffer *ret;
230   guint64 tmp_mask;
231   gboolean fec_mask_long;
232   guint i;
233
234   if (ctx->fec_packet_idx >= ctx->fec_packets)
235     return NULL;
236
237   g_array_set_size (ctx->scratch_buf, 0);
238   gst_rtp_ulpfec_enc_stream_ctx_get_protection_parameters (ctx, &seq_base,
239       &fec_mask, &start, &end);
240
241   tmp_mask = fec_mask;
242   fec_mask_long = rtp_ulpfec_mask_is_long (fec_mask);
243   for (i = start; i <= end; ++i) {
244     RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (ctx, i);
245     guint64 packet_mask =
246         rtp_ulpfec_packet_mask_from_seqnum (gst_rtp_buffer_get_seq (&info->rtp),
247         seq_base,
248         TRUE);
249
250     if (tmp_mask & packet_mask) {
251       tmp_mask ^= packet_mask;
252       rtp_buffer_to_ulpfec_bitstring (&info->rtp, ctx->scratch_buf, FALSE,
253           fec_mask_long);
254     }
255   }
256
257   g_assert (tmp_mask == 0);
258   ret =
259       rtp_ulpfec_bitstring_to_fec_rtp_buffer (ctx->scratch_buf, seq_base,
260       fec_mask_long, fec_mask, FALSE, pt, seq, timestamp, ssrc);
261   ++ctx->fec_packet_idx;
262   return ret;
263 }
264
265 static void
266 gst_rtp_ulpfec_enc_stream_ctx_report_budget (GstRtpUlpFecEncStreamCtx * ctx)
267 {
268   GST_TRACE_OBJECT (ctx->parent, "budget = %f budget_important = %f",
269       ctx->budget, ctx->budget_important);
270 }
271
272 static void
273 gst_rtp_ulpfec_enc_stream_ctx_increment_budget (GstRtpUlpFecEncStreamCtx * ctx,
274     GstBuffer * buffer)
275 {
276   if (ctx->percentage == 0 && ctx->percentage_important == 0) {
277     if (ctx->budget > 0) {
278       ctx->budget = 0;
279       ctx->budget_important = 0;
280     }
281     if (ctx->budget < 0)
282       ctx->budget += ctx->budget_inc;
283
284     return;
285   }
286   ctx->budget += ctx->budget_inc;
287
288   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_NON_DROPPABLE)) {
289     ctx->budget_important += ctx->budget_inc_important;
290   }
291
292   gst_rtp_ulpfec_enc_stream_ctx_report_budget (ctx);
293 }
294
295 static void
296 gst_rtp_ulpfec_enc_stream_ctx_decrement_budget (GstRtpUlpFecEncStreamCtx * ctx,
297     guint fec_packets_num)
298 {
299   if (ctx->budget_important >= 1.)
300     ctx->budget_important -= fec_packets_num;
301   ctx->budget -= fec_packets_num;
302
303   gst_rtp_ulpfec_enc_stream_ctx_report_budget (ctx);
304 }
305
306 static guint
307 gst_rtp_ulpfec_enc_stream_ctx_get_fec_packets_num (GstRtpUlpFecEncStreamCtx *
308     ctx)
309 {
310   g_assert_cmpfloat (ctx->budget_important, >=, 0.);
311
312   if (ctx->budget_important >= 1.)
313     return ctx->budget_important;
314   return ctx->budget > 0. ? (guint) ctx->budget : 0;
315 }
316
317 static void
318 gst_rtp_ulpfec_enc_stream_ctx_free_packets_buf (GstRtpUlpFecEncStreamCtx * ctx)
319 {
320   while (ctx->packets_buf.length)
321     gst_buffer_unref (g_queue_pop_tail (&ctx->packets_buf));
322 }
323
324 static void
325 gst_rtp_ulpfec_enc_stream_ctx_prepend_to_fec_buffer (GstRtpUlpFecEncStreamCtx *
326     ctx, GstRTPBuffer * rtp, guint buf_max_size)
327 {
328   GList *new_head;
329   if (ctx->packets_buf.length == buf_max_size) {
330     new_head = g_queue_pop_tail_link (&ctx->packets_buf);
331   } else {
332     new_head = g_list_alloc ();
333   }
334
335   gst_buffer_replace ((GstBuffer **) & new_head->data, rtp->buffer);
336   g_queue_push_head_link (&ctx->packets_buf, new_head);
337
338   g_assert_cmpint (ctx->packets_buf.length, <=, buf_max_size);
339 }
340
341 static GstFlowReturn
342 gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (GstRtpUlpFecEncStreamCtx * ctx,
343     guint8 pt, guint16 seq, guint32 timestamp, guint32 ssrc, guint8 twcc_ext_id,
344     GstRTPHeaderExtensionFlags twcc_ext_flags, guint8 twcc_appbits)
345 {
346   GstFlowReturn ret = GST_FLOW_OK;
347   guint fec_packets_num =
348       gst_rtp_ulpfec_enc_stream_ctx_get_fec_packets_num (ctx);
349
350   GST_LOG_OBJECT (ctx->parent, "ctx %p have %u fec packets to push", ctx,
351       fec_packets_num);
352   if (fec_packets_num) {
353     guint fec_packets_pushed = 0;
354     GstBuffer *latest_packet = ctx->packets_buf.head->data;
355     GstBuffer *fec = NULL;
356
357     gst_rtp_ulpfec_enc_stream_ctx_start (ctx, &ctx->packets_buf,
358         fec_packets_num);
359
360     while (NULL != (fec =
361             gst_rtp_ulpfec_enc_stream_ctx_protect (ctx, pt,
362                 seq + fec_packets_pushed, timestamp, ssrc))) {
363       gst_buffer_copy_into (fec, latest_packet, GST_BUFFER_COPY_TIMESTAMPS, 0,
364           -1);
365
366       /* If buffers in the stream we are protecting were meant to hold a TWCC seqnum,
367        * we also indicate that our protection buffers need one. At this point no seqnum
368        * has actually been set, we thus don't need to rewrite seqnums, simply indicate
369        * to RTPSession that the FEC buffers need one too */
370
371       /* FIXME: remove this logic once https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/923
372        * is addressed */
373       if (twcc_ext_id != 0) {
374         GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
375         guint16 data;
376
377         if (!gst_rtp_buffer_map (fec, GST_MAP_READWRITE, &rtp))
378           g_assert_not_reached ();
379
380         if (twcc_ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
381           gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
382               &data, sizeof (guint16));
383         } else if (twcc_ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) {
384           gst_rtp_buffer_add_extension_twobytes_header (&rtp, twcc_appbits,
385               twcc_ext_id, &data, sizeof (guint16));
386         }
387
388         gst_rtp_buffer_unmap (&rtp);
389       }
390
391       GST_LOG_OBJECT (ctx->parent, "ctx %p pushing generated fec buffer %"
392           GST_PTR_FORMAT, ctx, fec);
393       ret = gst_pad_push (ctx->srcpad, fec);
394       if (GST_FLOW_OK == ret)
395         ++fec_packets_pushed;
396       else
397         break;
398     }
399
400     gst_rtp_ulpfec_enc_stream_ctx_stop (ctx);
401
402     g_assert_cmpint (fec_packets_pushed, <=, fec_packets_num);
403
404     ctx->num_packets_protected += ctx->packets_buf.length;
405     ctx->num_packets_fec += fec_packets_pushed;
406     ctx->seqnum_offset += fec_packets_pushed;
407     ctx->seqnum += fec_packets_pushed;
408   }
409
410   gst_rtp_ulpfec_enc_stream_ctx_decrement_budget (ctx, fec_packets_num);
411   return ret;
412 }
413
414 static void
415 gst_rtp_ulpfec_enc_stream_ctx_cache_packet (GstRtpUlpFecEncStreamCtx * ctx,
416     GstRTPBuffer * rtp, gboolean * dst_empty_packet_buffer,
417     gboolean * dst_push_fec)
418 {
419   if (ctx->multipacket) {
420     gst_rtp_ulpfec_enc_stream_ctx_prepend_to_fec_buffer (ctx, rtp,
421         PACKETS_BUF_MAX_LENGTH);
422     gst_rtp_ulpfec_enc_stream_ctx_increment_budget (ctx, rtp->buffer);
423
424     *dst_empty_packet_buffer = gst_rtp_buffer_get_marker (rtp);
425     *dst_push_fec = *dst_empty_packet_buffer;
426
427     GST_TRACE ("ctx %p pushing fec %u", ctx, *dst_push_fec);
428   } else {
429     gboolean push_fec;
430
431     gst_rtp_ulpfec_enc_stream_ctx_prepend_to_fec_buffer (ctx, rtp, 1);
432
433     push_fec = ctx->fec_nth == 0 ? FALSE :
434         0 == (ctx->num_packets_received % ctx->fec_nth);
435
436     ctx->budget = push_fec ? 1 : 0;
437     ctx->budget_important = 0;
438
439     *dst_push_fec = push_fec;
440     *dst_empty_packet_buffer = FALSE;
441
442     GST_TRACE ("ctx %p pushing fec %u", ctx, *dst_push_fec);
443   }
444 }
445
446 static void
447 gst_rtp_ulpfec_enc_stream_ctx_configure (GstRtpUlpFecEncStreamCtx * ctx,
448     guint pt, guint percentage, guint percentage_important,
449     gboolean multipacket)
450 {
451   ctx->pt = pt;
452   ctx->percentage = percentage;
453   ctx->percentage_important = percentage_important;
454   ctx->multipacket = multipacket;
455
456   ctx->fec_nth = percentage ? 100 / percentage : 0;
457   if (percentage) {
458     ctx->budget_inc = percentage / 100.;
459     ctx->budget_inc_important = percentage > percentage_important ?
460         ctx->budget_inc : percentage_important / 100.;
461   }
462 /*
463    else {
464     ctx->budget_inc = 0.0;
465   }
466 */
467   ctx->budget_inc_important = percentage > percentage_important ?
468       ctx->budget_inc : percentage_important / 100.;
469
470   dump_stream_ctx_settings (ctx);
471 }
472
473 static GstRtpUlpFecEncStreamCtx *
474 gst_rtp_ulpfec_enc_stream_ctx_new (guint ssrc,
475     GstElement * parent, GstPad * srcpad,
476     guint pt, guint percentage, guint percentage_important,
477     gboolean multipacket)
478 {
479   GstRtpUlpFecEncStreamCtx *ctx = g_new0 (GstRtpUlpFecEncStreamCtx, 1);
480
481   ctx->ssrc = ssrc;
482   ctx->parent = parent;
483   ctx->srcpad = srcpad;
484
485   ctx->seqnum = g_random_int_range (0, G_MAXUINT16 / 2);
486
487   ctx->info_arr = g_array_new (FALSE, TRUE, sizeof (RtpUlpFecMapInfo));
488   g_array_set_clear_func (ctx->info_arr,
489       (GDestroyNotify) rtp_ulpfec_map_info_unmap);
490   ctx->parent = parent;
491   ctx->scratch_buf = g_array_new (FALSE, TRUE, sizeof (guint8));
492   gst_rtp_ulpfec_enc_stream_ctx_configure (ctx, pt,
493       percentage, percentage_important, multipacket);
494
495   return ctx;
496 }
497
498 static void
499 gst_rtp_ulpfec_enc_stream_ctx_free (GstRtpUlpFecEncStreamCtx * ctx)
500 {
501   if (ctx->num_packets_received) {
502     GST_INFO_OBJECT (ctx->parent, "Actual FEC overhead is %4.2f%% (%u/%u)\n",
503         ctx->num_packets_fec * (double) 100. / ctx->num_packets_received,
504         ctx->num_packets_fec, ctx->num_packets_received);
505   }
506   gst_rtp_ulpfec_enc_stream_ctx_free_packets_buf (ctx);
507
508   g_assert (0 == ctx->info_arr->len);
509   g_array_free (ctx->info_arr, TRUE);
510   g_array_free (ctx->scratch_buf, TRUE);
511   g_free (ctx);
512 }
513
514 static GstFlowReturn
515 gst_rtp_ulpfec_enc_stream_ctx_process (GstRtpUlpFecEncStreamCtx * ctx,
516     GstBuffer * buffer, guint8 twcc_ext_id)
517 {
518   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
519   GstFlowReturn ret;
520   gboolean push_fec = FALSE;
521   gboolean empty_packet_buffer = FALSE;
522   GstRTPHeaderExtensionFlags twcc_ext_flags = 0;
523   guint8 twcc_appbits = 0;
524
525   ctx->num_packets_received++;
526
527   if (ctx->seqnum_offset > 0) {
528     buffer = gst_buffer_make_writable (buffer);
529     if (!gst_rtp_buffer_map (buffer,
530             GST_MAP_READWRITE | GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtp))
531       g_assert_not_reached ();
532     gst_rtp_buffer_set_seq (&rtp,
533         gst_rtp_buffer_get_seq (&rtp) + ctx->seqnum_offset);
534   } else {
535     if (!gst_rtp_buffer_map (buffer,
536             GST_MAP_READ | GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtp))
537       g_assert_not_reached ();
538   }
539
540   if (twcc_ext_id != 0) {
541     gpointer data;
542     guint size;
543
544     if (gst_rtp_buffer_get_extension_onebyte_header (&rtp, twcc_ext_id, 0,
545             &data, &size)) {
546       twcc_ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE;
547     } else if (gst_rtp_buffer_get_extension_twobytes_header (&rtp,
548             &twcc_appbits, twcc_ext_id, 0, &data, &size)) {
549       twcc_ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE;
550     } else {
551       twcc_ext_id = 0;
552     }
553   }
554
555   gst_rtp_ulpfec_enc_stream_ctx_cache_packet (ctx, &rtp, &empty_packet_buffer,
556       &push_fec);
557
558   if (push_fec) {
559     guint32 fec_timestamp = gst_rtp_buffer_get_timestamp (&rtp);
560     guint32 fec_ssrc = gst_rtp_buffer_get_ssrc (&rtp);
561     guint16 fec_seq = gst_rtp_buffer_get_seq (&rtp) + 1;
562
563     gst_rtp_buffer_unmap (&rtp);
564
565     ret = gst_pad_push (ctx->srcpad, buffer);
566     if (GST_FLOW_OK == ret)
567       ret =
568           gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (ctx, ctx->pt, fec_seq,
569           fec_timestamp, fec_ssrc, twcc_ext_id, twcc_ext_flags, twcc_appbits);
570   } else {
571     gst_rtp_buffer_unmap (&rtp);
572     ret = gst_pad_push (ctx->srcpad, buffer);
573   }
574
575   if (empty_packet_buffer)
576     gst_rtp_ulpfec_enc_stream_ctx_free_packets_buf (ctx);
577
578   return ret;
579 }
580
581 static GstRtpUlpFecEncStreamCtx *
582 gst_rtp_ulpfec_enc_aquire_ctx (GstRtpUlpFecEnc * fec, guint ssrc)
583 {
584   GstRtpUlpFecEncStreamCtx *ctx;
585
586   GST_OBJECT_LOCK (fec);
587   ctx = g_hash_table_lookup (fec->ssrc_to_ctx, GUINT_TO_POINTER (ssrc));
588   if (ctx == NULL) {
589     ctx =
590         gst_rtp_ulpfec_enc_stream_ctx_new (ssrc, GST_ELEMENT_CAST (fec),
591         fec->srcpad, fec->pt, fec->percentage,
592         fec->percentage_important, fec->multipacket);
593     g_hash_table_insert (fec->ssrc_to_ctx, GUINT_TO_POINTER (ssrc), ctx);
594   }
595   GST_OBJECT_UNLOCK (fec);
596
597   return ctx;
598 }
599
600 static GstFlowReturn
601 gst_rtp_ulpfec_enc_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
602 {
603   GstRtpUlpFecEnc *fec = GST_RTP_ULPFEC_ENC (parent);
604   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
605   GstFlowReturn ret;
606   guint ssrc = 0;
607   GstRtpUlpFecEncStreamCtx *ctx;
608
609   if (fec->pt == UNDEF_PT)
610     return gst_pad_push (fec->srcpad, buffer);
611
612   /* FIXME: avoid this additional mapping of the buffer to get the
613      ssrc! */
614   if (!gst_rtp_buffer_map (buffer,
615           GST_MAP_READ | GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtp)) {
616     g_assert_not_reached ();
617   }
618   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
619   gst_rtp_buffer_unmap (&rtp);
620
621   ctx = gst_rtp_ulpfec_enc_aquire_ctx (fec, ssrc);
622
623   ret = gst_rtp_ulpfec_enc_stream_ctx_process (ctx, buffer, fec->twcc_ext_id);
624
625   /* FIXME: does not work for multiple ssrcs */
626   fec->num_packets_protected = ctx->num_packets_protected;
627
628   return ret;
629 }
630
631 static void
632 gst_rtp_ulpfec_enc_configure_ctx (gpointer key, gpointer value,
633     gpointer user_data)
634 {
635   GstRtpUlpFecEnc *fec = user_data;
636   GstRtpUlpFecEncStreamCtx *ctx = value;
637
638   gst_rtp_ulpfec_enc_stream_ctx_configure (ctx, fec->pt,
639       fec->percentage, fec->percentage_important, fec->multipacket);
640 }
641
642 static guint8
643 _get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
644 {
645   guint i;
646   guint8 extmap_id = 0;
647   guint n_fields = gst_structure_n_fields (s);
648
649   for (i = 0; i < n_fields; i++) {
650     const gchar *field_name = gst_structure_nth_field_name (s, i);
651     if (g_str_has_prefix (field_name, "extmap-")) {
652       const gchar *str = gst_structure_get_string (s, field_name);
653       if (str && g_strcmp0 (str, ext_name) == 0) {
654         gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
655         if (id > 0 && id < 15) {
656           extmap_id = id;
657           break;
658         }
659       }
660     }
661   }
662   return extmap_id;
663 }
664
665 #define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
666
667 static gboolean
668 gst_rtp_ulpfec_enc_event_sink (GstPad * pad, GstObject * parent,
669     GstEvent * event)
670 {
671   GstRtpUlpFecEnc *self = GST_RTP_ULPFEC_ENC (parent);
672
673   switch (GST_EVENT_TYPE (event)) {
674     case GST_EVENT_CAPS:
675     {
676       GstCaps *caps;
677       GstStructure *s;
678
679       gst_event_parse_caps (event, &caps);
680       s = gst_caps_get_structure (caps, 0);
681       self->twcc_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
682
683       GST_INFO_OBJECT (self, "TWCC extension ID: %u", self->twcc_ext_id);
684
685       break;
686     }
687     default:
688       break;
689   }
690
691   return gst_pad_event_default (pad, parent, event);
692 }
693
694 static void
695 gst_rtp_ulpfec_enc_set_property (GObject * object, guint prop_id,
696     const GValue * value, GParamSpec * pspec)
697 {
698   GstRtpUlpFecEnc *fec = GST_RTP_ULPFEC_ENC (object);
699
700   switch (prop_id) {
701     case PROP_PT:
702       fec->pt = g_value_get_uint (value);
703       break;
704     case PROP_MULTIPACKET:
705       fec->multipacket = g_value_get_boolean (value);
706       break;
707     case PROP_PERCENTAGE:
708       fec->percentage = g_value_get_uint (value);
709       break;
710     case PROP_PERCENTAGE_IMPORTANT:
711       fec->percentage_important = g_value_get_uint (value);
712       break;
713     default:
714       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
715       break;
716   }
717
718   GST_OBJECT_LOCK (fec);
719   g_hash_table_foreach (fec->ssrc_to_ctx, gst_rtp_ulpfec_enc_configure_ctx,
720       fec);
721   GST_OBJECT_UNLOCK (fec);
722 }
723
724 static void
725 gst_rtp_ulpfec_enc_get_property (GObject * object, guint prop_id,
726     GValue * value, GParamSpec * pspec)
727 {
728   GstRtpUlpFecEnc *fec = GST_RTP_ULPFEC_ENC (object);
729   switch (prop_id) {
730     case PROP_PT:
731       g_value_set_uint (value, fec->pt);
732       break;
733     case PROP_PROTECTED:
734       g_value_set_uint (value, fec->num_packets_protected);
735       break;
736     case PROP_PERCENTAGE:
737       g_value_set_uint (value, fec->percentage);
738       break;
739     case PROP_PERCENTAGE_IMPORTANT:
740       g_value_set_uint (value, fec->percentage_important);
741       break;
742     case PROP_MULTIPACKET:
743       g_value_set_boolean (value, fec->multipacket);
744       break;
745     default:
746       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
747       break;
748   }
749 }
750
751 static void
752 gst_rtp_ulpfec_enc_dispose (GObject * obj)
753 {
754   GstRtpUlpFecEnc *fec = GST_RTP_ULPFEC_ENC (obj);
755
756   if (fec->ssrc_to_ctx)
757     g_hash_table_destroy (fec->ssrc_to_ctx);
758   fec->ssrc_to_ctx = NULL;
759
760   G_OBJECT_CLASS (gst_rtp_ulpfec_enc_parent_class)->dispose (obj);
761 }
762
763 static void
764 gst_rtp_ulpfec_enc_init (GstRtpUlpFecEnc * fec)
765 {
766   fec->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
767   gst_element_add_pad (GST_ELEMENT (fec), fec->srcpad);
768
769   fec->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
770   GST_PAD_SET_PROXY_CAPS (fec->sinkpad);
771   GST_PAD_SET_PROXY_ALLOCATION (fec->sinkpad);
772   gst_pad_set_chain_function (fec->sinkpad,
773       GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_chain));
774   gst_pad_set_event_function (fec->sinkpad,
775       GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_event_sink));
776   gst_element_add_pad (GST_ELEMENT (fec), fec->sinkpad);
777
778   fec->ssrc_to_ctx = g_hash_table_new_full (NULL, NULL, NULL,
779       (GDestroyNotify) gst_rtp_ulpfec_enc_stream_ctx_free);
780 }
781
782 static void
783 gst_rtp_ulpfec_enc_class_init (GstRtpUlpFecEncClass * klass)
784 {
785   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
786   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
787
788   GST_DEBUG_CATEGORY_INIT (gst_rtp_ulpfec_enc_debug, "rtpulpfecenc", 0,
789       "FEC encoder element");
790
791   gst_element_class_add_pad_template (element_class,
792       gst_static_pad_template_get (&srctemplate));
793   gst_element_class_add_pad_template (element_class,
794       gst_static_pad_template_get (&sinktemplate));
795
796   gst_element_class_set_static_metadata (element_class,
797       "RTP FEC Encoder",
798       "Codec/Payloader/Network/RTP",
799       "Encodes RTP FEC (RFC5109)", "Mikhail Fludkov <misha@pexip.com>");
800
801   gobject_class->set_property =
802       GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_set_property);
803   gobject_class->get_property =
804       GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_get_property);
805   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_dispose);
806
807   g_object_class_install_property (gobject_class, PROP_PT,
808       g_param_spec_uint ("pt", "payload type",
809           "The payload type of FEC packets", 0, 255, DEFAULT_PT,
810           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
811
812   g_object_class_install_property (gobject_class, PROP_MULTIPACKET,
813       g_param_spec_boolean ("multipacket", "Multipacket",
814           "Apply FEC on multiple packets", DEFAULT_MULTIPACKET,
815           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
816
817   g_object_class_install_property (gobject_class, PROP_PERCENTAGE,
818       g_param_spec_uint ("percentage", "Percentage",
819           "FEC overhead percentage for the whole stream", 0, 100, DEFAULT_PCT,
820           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
821
822   g_object_class_install_property (gobject_class, PROP_PERCENTAGE_IMPORTANT,
823       g_param_spec_uint ("percentage-important", "Percentage important",
824           "FEC overhead percentage for important packets",
825           0, 100, DEFAULT_PCT_IMPORTANT,
826           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
827
828   g_object_class_install_property (gobject_class, PROP_PROTECTED,
829       g_param_spec_uint ("protected", "Protected",
830           "Count of protected packets", 0, G_MAXUINT32, 0,
831           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
832 }