1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 * Author: Mikhail Fludkov <misha@pexip.com>
21 #include "rtpstoragestream.h"
23 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
25 static RtpStorageItem *
26 rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq)
28 RtpStorageItem *ret = g_slice_new0 (RtpStorageItem);
36 rtp_storage_item_free (RtpStorageItem * item)
38 g_assert (item->buffer != NULL);
39 gst_buffer_unref (item->buffer);
40 g_slice_free (RtpStorageItem, item);
44 rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata)
46 gint seq_diff = gst_rtp_buffer_compare_seqnum (
47 ((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq);
56 rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time)
59 guint i, too_old_buffers_num = 0;
61 g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time));
62 g_assert (GST_CLOCK_TIME_IS_VALID (size_time));
63 g_assert_cmpint (size_time, >, 0);
65 /* Iterating from oldest sequence numbers to newest */
66 for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) {
67 RtpStorageItem *item = it->data;
68 GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer);
69 if (GST_CLOCK_TIME_IS_VALID (arrival_time)) {
70 if (stream->max_arrival_time - arrival_time > size_time) {
71 too_old_buffers_num = i + 1;
77 for (i = 0; i < too_old_buffers_num; ++i) {
78 RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
80 GST_TRACE ("Removing %u/%u buffers, pt=%d seq=%d for ssrc=%08x",
81 i, too_old_buffers_num, item->pt, item->seq, stream->ssrc);
83 rtp_storage_item_free (item);
88 rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream,
89 GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq)
91 GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer);
93 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) {
94 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)))
95 stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time);
97 stream->max_arrival_time = arrival_time;
99 rtp_storage_stream_resize (stream, size_time);
100 rtp_storage_stream_add_item (stream, buffer, pt, seq);
102 rtp_storage_stream_add_item (stream, buffer, pt, seq);
107 rtp_storage_stream_new (guint32 ssrc)
109 RtpStorageStream *ret = g_slice_new0 (RtpStorageStream);
110 ret->max_arrival_time = GST_CLOCK_TIME_NONE;
112 g_mutex_init (&ret->stream_lock);
117 rtp_storage_stream_free (RtpStorageStream * stream)
119 STREAM_LOCK (stream);
120 while (stream->queue.length)
121 rtp_storage_item_free (g_queue_pop_tail (&stream->queue));
122 STREAM_UNLOCK (stream);
123 g_mutex_clear (&stream->stream_lock);
124 g_slice_free (RtpStorageStream, stream);
128 rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer,
129 guint8 pt, guint16 seq)
131 RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq);
132 GList *sibling = g_queue_find_custom (&stream->queue, item,
133 (GCompareFunc) rtp_storage_item_compare);
135 g_queue_insert_before (&stream->queue, sibling, item);
139 rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream,
140 guint8 pt_fec, guint16 lost_seq)
142 guint ret_length = 0;
145 gboolean saw_fec = TRUE; /* To initialize the start pointer in the loop below */
148 /* Looking for media stream chunk with FEC packets at the end, which could
149 * can have the lost packet. For example:
151 * |#10 FEC| |#9 FEC| |#8| ... |#6| |#5 FEC| |#4 FEC| |#3 FEC| |#2| |#1| |#0|
153 * Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other
154 * packets are not relevant for recovery of packet 7.
156 * Or the lost packet can be in the storage. In that case single packet is returned.
158 * - it could have arrived right after it was considered lost (more of a corner case)
159 * - it was recovered together with the other lost packet (most likely)
161 for (it = stream->queue.tail; it; it = it->prev) {
162 RtpStorageItem *item = it->data;
163 gboolean found_end = FALSE;
165 /* Is the buffer we lost in the storage? */
166 if (item->seq == lost_seq) {
173 if (pt_fec == item->pt) {
174 gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq);
178 gboolean media_next =
179 pt_fec != ((RtpStorageItem *) it->prev->data)->pt;
180 found_end = media_next;
185 } else if (saw_fec) {
202 GstBufferList *ret = gst_buffer_list_new_sized (ret_length);
205 GST_LOG ("Found %u buffers with lost seq=%d for ssrc=%08x, creating %"
206 GST_PTR_FORMAT, ret_length, lost_seq, stream->ssrc, ret);
208 for (it = start; it != end->prev; it = it->prev)
209 gst_buffer_list_add (ret,
210 gst_buffer_ref (((RtpStorageItem *) it->data)->buffer));
218 rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream,
222 for (it = stream->queue.head; it; it = it->next) {
223 RtpStorageItem *item = it->data;
224 if (item->seq == lost_seq) {
225 GST_LOG ("Found buffer pt=%u seq=%u for ssrc=%08x %" GST_PTR_FORMAT,
226 item->pt, item->seq, stream->ssrc, item->buffer);
227 return gst_buffer_ref (item->buffer);
230 GST_DEBUG ("Could not find packet with seq=%u for ssrc=%08x",
231 lost_seq, stream->ssrc);