c4d09bb49b2fa015417c4cb744a1a1d181609e8d
[platform/upstream/gst-plugins-good.git] / gst / rtp / rtpstoragestream.c
1 /* GStreamer plugin for forward error correction
2  * Copyright (C) 2017 Pexip
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * Author: Mikhail Fludkov <misha@pexip.com>
19  */
20
21 #include "rtpstoragestream.h"
22
23 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
24
25 static RtpStorageItem *
26 rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq)
27 {
28   RtpStorageItem *ret = g_slice_new0 (RtpStorageItem);
29   ret->buffer = buffer;
30   ret->pt = pt;
31   ret->seq = seq;
32   return ret;
33 }
34
35 static void
36 rtp_storage_item_free (RtpStorageItem * item)
37 {
38   g_assert (item->buffer != NULL);
39   gst_buffer_unref (item->buffer);
40   g_slice_free (RtpStorageItem, item);
41 }
42
43 static gint
44 rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata)
45 {
46   gint seq_diff = gst_rtp_buffer_compare_seqnum (
47       ((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq);
48
49   if (seq_diff >= 0)
50     return 0;
51
52   return 1;
53 }
54
55 static void
56 rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time)
57 {
58   GList *it;
59   guint i, too_old_buffers_num = 0;
60
61   g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time));
62   g_assert (GST_CLOCK_TIME_IS_VALID (size_time));
63   g_assert_cmpint (size_time, >, 0);
64
65   /* Iterating from oldest sequence numbers to newest */
66   for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) {
67     RtpStorageItem *item = it->data;
68     GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer);
69     if (GST_CLOCK_TIME_IS_VALID (arrival_time)) {
70       if (stream->max_arrival_time - arrival_time > size_time) {
71         too_old_buffers_num = i + 1;
72       } else
73         break;
74     }
75   }
76
77   for (i = 0; i < too_old_buffers_num; ++i) {
78     RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
79     rtp_storage_item_free (item);
80   }
81 }
82
83 void
84 rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream,
85     GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq)
86 {
87   GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer);
88
89   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) {
90     if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)))
91       stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time);
92     else
93       stream->max_arrival_time = arrival_time;
94
95     rtp_storage_stream_resize (stream, size_time);
96     rtp_storage_stream_add_item (stream, buffer, pt, seq);
97   } else {
98     rtp_storage_stream_add_item (stream, buffer, pt, seq);
99   }
100 }
101
102 RtpStorageStream *
103 rtp_storage_stream_new (guint32 ssrc)
104 {
105   RtpStorageStream *ret = g_slice_new0 (RtpStorageStream);
106   ret->max_arrival_time = GST_CLOCK_TIME_NONE;
107   ret->ssrc = ssrc;
108   g_mutex_init (&ret->stream_lock);
109   return ret;
110 }
111
112 void
113 rtp_storage_stream_free (RtpStorageStream * stream)
114 {
115   STREAM_LOCK (stream);
116   while (stream->queue.length)
117     rtp_storage_item_free (g_queue_pop_tail (&stream->queue));
118   STREAM_UNLOCK (stream);
119   g_mutex_clear (&stream->stream_lock);
120   g_slice_free (RtpStorageStream, stream);
121 }
122
123 void
124 rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer,
125     guint8 pt, guint16 seq)
126 {
127   RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq);
128   GList *sibling = g_queue_find_custom (&stream->queue, item,
129       (GCompareFunc) rtp_storage_item_compare);
130
131   g_queue_insert_before (&stream->queue, sibling, item);
132 }
133
134 GstBufferList *
135 rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream,
136     guint8 pt_fec, guint16 lost_seq)
137 {
138   guint ret_length = 0;
139   GList *end = NULL;
140   GList *start = NULL;
141   gboolean saw_fec = TRUE;      /* To initialize the start pointer in the loop below */
142   GList *it;
143
144   /* Looking for media stream chunk with FEC packets at the end, which could
145    * can have the lost packet. For example:
146    *
147    *   |#10 FEC|  |#9 FEC|  |#8| ... |#6|  |#5 FEC|  |#4 FEC|  |#3 FEC|  |#2|  |#1|  |#0|
148    *
149    * Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other
150    * packets are not relevant for recovery of packet 7.
151    *
152    * Or the lost packet can be in the storage. In that case single packet is returned.
153    * It can happen if:
154    * - it could have arrived right after it was considered lost (more of a corner case)
155    * - it was recovered together with the other lost packet (most likely)
156    */
157   for (it = stream->queue.tail; it; it = it->prev) {
158     RtpStorageItem *item = it->data;
159     gboolean found_end = FALSE;
160
161     /* Is the buffer we lost in the storage? */
162     if (item->seq == lost_seq) {
163       start = it;
164       end = it;
165       ret_length = 1;
166       break;
167     }
168
169     if (pt_fec == item->pt) {
170       gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq);
171
172       if (seq_diff >= 0) {
173         if (it->prev) {
174           gboolean media_next =
175               pt_fec != ((RtpStorageItem *) it->prev->data)->pt;
176           found_end = media_next;
177         } else
178           found_end = TRUE;
179       }
180       saw_fec = TRUE;
181     } else if (saw_fec) {
182       saw_fec = FALSE;
183       start = it;
184       ret_length = 0;
185     }
186
187     ++ret_length;
188     if (found_end) {
189       end = it;
190       break;
191     }
192   }
193
194   if (end && !start)
195     start = end;
196
197   if (start && end) {
198     GstBufferList *ret = gst_buffer_list_new_sized (ret_length);
199     GList *it;
200
201     for (it = start; it != end->prev; it = it->prev)
202       gst_buffer_list_add (ret,
203           gst_buffer_ref (((RtpStorageItem *) it->data)->buffer));
204     return ret;
205   }
206
207   return NULL;
208 }
209
210 GstBuffer *
211 rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream,
212     guint16 lost_seq)
213 {
214   GList *it;
215   for (it = stream->queue.head; it; it = it->next) {
216     RtpStorageItem *item = it->data;
217     if (item->seq == lost_seq)
218       return gst_buffer_ref (item->buffer);
219   }
220   return NULL;
221 }