* Generic Forward Error Correction (FEC) decoder for Uneven Level
* Protection (ULP) as described in RFC 5109.
*
+ * It differs from the RFC in one important way, it multiplexes the
+ * FEC packets in the same sequence number as media packets. This is to be
+ * compatible with libwebrtc as using in Google Chrome and with Microsoft
+ * Lync / Skype for Business.
+ *
+ * This element will work in combination with an upstream #GstRtpStorage
+ * element and attempt to recover packets declared lost through custom
+ * 'GstRTPPacketLost' events, usually emitted by #GstRtpJitterBuffer.
+ *
+ * If no storage is provided using the #GstRtpUlpFecDec:storage
+ * property, it will try to get it from an element upstream.
+ *
+ * Additionally, the payload types of the protection packets *must* be
+ * provided to this element via its #GstRtpUlpFecDec:pt property.
+ *
+ * When using #GstRtpBin, this element should be inserted through the
+ * #GstRtpBin::request-fec-decoder signal.
+ *
+ * <refsect2>
+ * <title>Example pipeline</title>
+ * |[
+ * gst-launch-1.0 udpsrc port=8888 caps="application/x-rtp, payload=96, clock-rate=90000" ! rtpstorage size-time=220000000 ! rtpssrcdemux ! application/x-rtp, payload=96, clock-rate=90000, media=video, encoding-name=H264 ! rtpjitterbuffer do-lost=1 latency=200 ! rtpulpfecdec pt=122 ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink
+ * ]| This example will receive a stream with FEC and try to reconstruct the packets.
+ *
+ * Example programs are available at
+ * <https://github.com/sdroege/gstreamer-rs/blob/master/examples/src/bin/rtpfecserver.rs>
+ * and
+ * <https://github.com/sdroege/gstreamer-rs/blob/master/examples/src/bin/rtpfecclient.rs>.
+ *
+ * </refsect2>
+ *
+ * See also: #GstRtpUlpFecEnc, #GstRtpBin, #GstRtpStorage
* Since: 1.14
*/
guint8 fec_pt, guint16 lost_seq)
{
guint fec_packets = 0;
+ gsize i;
g_assert (NULL == self->info_media);
g_assert (0 == self->info_fec->len);
g_array_set_size (self->info_arr, gst_buffer_list_length (buflist));
- for (gsize i = 0;
+ for (i = 0;
i < gst_buffer_list_length (buflist) && !self->lost_packet_from_storage;
++i) {
GstBuffer *buffer = gst_buffer_list_get (buflist, i);
GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &info->rtp);
if (lost_seq == gst_rtp_buffer_get_seq (&info->rtp)) {
- GST_DEBUG_OBJECT (self, "Received lost packet from from the storage");
+ GST_DEBUG_OBJECT (self, "Received lost packet from the storage");
g_list_free (self->info_media);
self->info_media = NULL;
self->lost_packet_from_storage = TRUE;
guint16 fec_seq_base)
{
guint64 mask = 0;
- for (GList * it = self->info_media; it; it = it->next) {
+ GList *it;
+
+ for (it = self->info_media; it; it = it->next) {
RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (self, it->data);
mask |=
rtp_ulpfec_packet_mask_from_seqnum (gst_rtp_buffer_get_seq (&info->rtp),
gst_rtp_ulpfec_dec_is_recovered_pt_valid (GstRtpUlpFecDec * self, gint media_pt,
guint8 recovered_pt)
{
+ GList *it;
if (media_pt == recovered_pt)
return TRUE;
- for (GList * it = self->info_media; it; it = it->next) {
+ for (it = self->info_media; it; it = it->next) {
RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (self, it->data);
if (gst_rtp_buffer_get_payload_type (&info->rtp) == recovered_pt)
return TRUE;
gboolean fec_mask_long = rtp_ulpfec_buffer_get_fechdr (&info_fec->rtp)->L;
guint16 fec_seq_base = rtp_ulpfec_buffer_get_seq_base (&info_fec->rtp);
GstBuffer *ret;
+ GList *it;
g_array_set_size (self->scratch_buf, 0);
rtp_buffer_to_ulpfec_bitstring (&info_fec->rtp, self->scratch_buf, TRUE,
fec_mask_long);
- for (GList * it = self->info_media; it; it = it->next) {
+ for (it = self->info_media; it; it = it->next) {
RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (self, it->data);
guint64 packet_mask =
rtp_ulpfec_packet_mask_from_seqnum (gst_rtp_buffer_get_seq (&info->rtp),
return gst_buffer_ref (info->rtp.buffer);
}
+/* __has_builtin only works with clang, so test compiler version for gcc */
+/* Intel compiler and MSVC probably have their own things as well */
+/* TODO: make sure we use builtin for clang as well */
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define rtp_ulpfec_ctz64 __builtin_ctzll
+#else
+static inline gint
+rtp_ulpfec_ctz64_inline (guint64 mask)
+{
+ gint nth_bit = 0;
+
+ do {
+ if ((mask & 1))
+ return nth_bit;
+ mask = mask >> 1;
+ } while (++nth_bit < 64);
+
+ return -1; /* should not be reached, since mask must not be 0 */
+}
+
+#define rtp_ulpfec_ctz64 rtp_ulpfec_ctz64_inline
+#endif
+
static GstBuffer *
gst_rtp_ulpfec_dec_recover (GstRtpUlpFecDec * self, guint32 ssrc, gint media_pt,
guint8 * dst_pt, guint16 * dst_seq)
{
guint64 media_mask = 0;
gint media_mask_seq_base = -1;
+ gsize i;
if (self->lost_packet_from_storage)
return gst_rtp_ulpfec_dec_recover_from_storage (self, dst_pt, dst_seq);
/* Looking for a FEC packet which can be used for recovery */
- for (gsize i = 0; i < self->info_fec->len; ++i) {
+ for (i = 0; i < self->info_fec->len; ++i) {
RtpUlpFecMapInfo *info = RTP_FEC_MAP_INFO_NTH (self,
g_ptr_array_index (self->info_fec, i));
guint16 seq_base = rtp_ulpfec_buffer_get_seq_base (&info->rtp);
/* Do we have any 1s? Checking if current FEC packet can be used for recovery */
if (0 != missing_packets_mask) {
- guint trailing_zeros = __builtin_ctzll (missing_packets_mask);
+ guint trailing_zeros = rtp_ulpfec_ctz64 (missing_packets_mask);
/* Is it the only 1 in the mask? Checking if we lacking single packet in
* that case FEC packet can be used for recovery */
- if (missing_packets_mask == (1ULL << trailing_zeros)) {
+ if (missing_packets_mask == (G_GUINT64_CONSTANT (1) << trailing_zeros)) {
GstBuffer *ret;
*dst_seq =
GstRtpUlpFecDec *self = GST_RTP_ULPFEC_DEC (parent);
if (G_LIKELY (GST_FLOW_OK == self->chain_return_val)) {
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ buf = gst_buffer_make_writable (buf);
+
if (G_UNLIKELY (self->unset_discont_flag)) {
self->unset_discont_flag = FALSE;
- buf = gst_buffer_make_writable (buf);
GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
}
+
+ gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtp);
+ gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
+ gst_rtp_buffer_unmap (&rtp);
+
return gst_pad_push (self->srcpad, buf);
}
gst_rtp_ulpfec_dec_recover (self, self->caps_ssrc, caps_pt,
&recovered_pt, &recovered_seq))) {
if (seqnum == recovered_seq) {
+ GstBuffer *sent_buffer;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
recovered_buffer = gst_buffer_make_writable (recovered_buffer);
GST_BUFFER_PTS (recovered_buffer) = timestamp;
/* GST_BUFFER_DURATION (recovered_buffer) = duration;
if (!self->lost_packet_from_storage)
rtp_storage_put_recovered_packet (self->storage,
- gst_buffer_ref (recovered_buffer), recovered_pt, self->caps_ssrc,
- recovered_seq);
+ recovered_buffer, recovered_pt, self->caps_ssrc, recovered_seq);
GST_DEBUG_OBJECT (self,
"Pushing recovered packet ssrc=0x%08x seq=%u %" GST_PTR_FORMAT,
self->caps_ssrc, seqnum, recovered_buffer);
+ sent_buffer = gst_buffer_copy_deep (recovered_buffer);
+
+ if (self->lost_packet_from_storage)
+ gst_buffer_unref (recovered_buffer);
+
+ gst_rtp_buffer_map (sent_buffer, GST_MAP_WRITE, &rtp);
+ gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
+ gst_rtp_buffer_unmap (&rtp);
+
ret = FALSE;
self->unset_discont_flag = TRUE;
- self->chain_return_val = gst_pad_push (self->srcpad, recovered_buffer);
+ self->chain_return_val = gst_pad_push (self->srcpad, sent_buffer);
break;
}
- rtp_storage_put_recovered_packet (self->storage,
- recovered_buffer, recovered_pt, self->caps_ssrc, recovered_seq);
+ if (!self->lost_packet_from_storage) {
+ rtp_storage_put_recovered_packet (self->storage,
+ recovered_buffer, recovered_pt, self->caps_ssrc, recovered_seq);
+ } else {
+ gst_buffer_unref (recovered_buffer);
+ }
}
gst_rtp_ulpfec_dec_stop (self);
gst_event_has_name (event, "GstRTPPacketLost")) {
guint seqnum;
GstClockTime timestamp, duration;
+ GstStructure *s;
+
+ event = gst_event_make_writable (event);
+ s = gst_event_writable_structure (event);
g_assert (self->have_caps_ssrc);
- g_assert (self->storage);
- if (!gst_structure_get (gst_event_get_structure (event),
+ if (self->storage == NULL) {
+ GstQuery *q = gst_query_new_custom (GST_QUERY_CUSTOM,
+ gst_structure_new_empty ("GstRtpStorage"));
+
+ if (gst_pad_peer_query (self->sinkpad, q)) {
+ const GstStructure *s = gst_query_get_structure (q);
+
+ if (gst_structure_has_field_typed (s, "storage", G_TYPE_OBJECT)) {
+ gst_structure_get (s, "storage", G_TYPE_OBJECT, &self->storage, NULL);
+ }
+ }
+ gst_query_unref (q);
+ }
+
+ if (self->storage == NULL) {
+ GST_ELEMENT_WARNING (self, STREAM, FAILED, ("Internal storage not found"),
+ ("You need to add rtpstorage element upstream from rtpulpfecdec."));
+ return FALSE;
+ }
+
+ if (!gst_structure_get (s,
"seqnum", G_TYPE_UINT, &seqnum,
"timestamp", G_TYPE_UINT64, ×tamp,
"duration", G_TYPE_UINT64, &duration, NULL))
forward =
gst_rtp_ulpfec_dec_handle_packet_loss (self, seqnum, timestamp,
duration);
- if (forward)
+
+ if (forward) {
+ gst_structure_remove_field (s, "seqnum");
+ gst_structure_set (s, "might-have-been-fec", G_TYPE_BOOLEAN, TRUE, NULL);
++self->packets_unrecovered;
- else
+ } else {
++self->packets_recovered;
+ }
+
GST_DEBUG_OBJECT (self, "Unrecovered / Recovered: %lu / %lu",
(gulong) self->packets_unrecovered, (gulong) self->packets_recovered);
} else if (GST_EVENT_CAPS == GST_EVENT_TYPE (event)) {
self->fec_pt = DEFAULT_FEC_PT;
+ self->next_seqnum = g_random_int_range (0, G_MAXINT16);
+
self->chain_return_val = GST_FLOW_OK;
self->have_caps_ssrc = FALSE;
self->caps_ssrc = 0;
g_object_class_install_properties (gobject_class, N_PROPERTIES,
klass_properties);
+
+ g_assert (rtp_ulpfec_ctz64 (G_GUINT64_CONSTANT (0x1)) == 0);
+ g_assert (rtp_ulpfec_ctz64 (G_GUINT64_CONSTANT (0x8000000000000000)) == 63);
}