ecabffced657eefc746d370b3e9853df5259ee52
[platform/upstream/gst-plugins-good.git] / gst / rtp / rtpstorage.c
1 /* GStreamer plugin for forward error correction
2  * Copyright (C) 2017 Pexip
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * Author: Mikhail Fludkov <misha@pexip.com>
19  */
20
21 #include <gst/rtp/gstrtpbuffer.h>
22
23 #include "rtpstorage.h"
24 #include "rtpstoragestream.h"
25
26 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
27
28 enum
29 {
30   SIGNAL_PACKET_RECOVERED,
31   LAST_SIGNAL,
32 };
33
34 static guint rtp_storage_signals[LAST_SIGNAL] = { 0 };
35
36 G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT);
37
38 #define STORAGE_LOCK(s)   g_mutex_lock   (&(s)->streams_lock)
39 #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock)
40 #define DEFAULT_SIZE_TIME (0)
41
42 static void
43 rtp_storage_init (RtpStorage * self)
44 {
45   self->size_time = DEFAULT_SIZE_TIME;
46   self->streams = g_hash_table_new_full (NULL, NULL, NULL,
47       (GDestroyNotify) rtp_storage_stream_free);
48   g_mutex_init (&self->streams_lock);
49 }
50
51 static void
52 rtp_storage_dispose (GObject * obj)
53 {
54   RtpStorage *self = RTP_STORAGE (obj);
55   STORAGE_LOCK (self);
56   g_hash_table_unref (self->streams);
57   self->streams = NULL;
58   STORAGE_UNLOCK (self);
59   g_mutex_clear (&self->streams_lock);
60   G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj);
61 }
62
63 static void
64 rtp_storage_class_init (RtpStorageClass * klass)
65 {
66   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
67
68   rtp_storage_signals[SIGNAL_PACKET_RECOVERED] =
69       g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass),
70       G_SIGNAL_RUN_LAST, 0, NULL, NULL,
71       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
72
73   gobject_class->dispose = rtp_storage_dispose;
74 }
75
76 GstBufferList *
77 rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt,
78     guint32 ssrc, guint16 lost_seq)
79 {
80   GstBufferList *ret = NULL;
81   RtpStorageStream *stream;
82
83   if (0 == self->size_time) {
84     GST_WARNING_OBJECT (self, "Received request for recovery RTP packets"
85         " around lost_seqnum=%u fec_pt=%u for ssrc=%08x, but size is 0",
86         lost_seq, fec_pt, ssrc);
87     return NULL;
88   }
89
90   STORAGE_LOCK (self);
91   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
92   STORAGE_UNLOCK (self);
93
94   if (NULL == stream) {
95     GST_ERROR_OBJECT (self, "Cant find ssrc = 0x08%x", ssrc);
96   } else {
97     STREAM_LOCK (stream);
98     if (stream->queue.length > 0) {
99       GST_LOG_OBJECT (self, "Looking for recovery packets for fec_pt=%u around"
100           " lost_seq=%u for ssrc=%08x", fec_pt, lost_seq, ssrc);
101       ret =
102           rtp_storage_stream_get_packets_for_recovery (stream, fec_pt,
103           lost_seq);
104     } else {
105       GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
106     }
107     STREAM_UNLOCK (stream);
108   }
109
110   return ret;
111 }
112
113 GstBuffer *
114 rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc,
115     guint16 lost_seq)
116 {
117   GstBuffer *ret = NULL;
118   RtpStorageStream *stream;
119
120   if (0 == self->size_time) {
121     GST_WARNING_OBJECT (self, "Received request for redundant RTP packet with"
122         " seq=%u for ssrc=%08x, but size is 0", lost_seq, ssrc);
123     return NULL;
124   }
125
126   STORAGE_LOCK (self);
127   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
128   STORAGE_UNLOCK (self);
129
130   if (NULL == stream) {
131     GST_ERROR_OBJECT (self, "Cant find ssrc = 0x%x", ssrc);
132   } else {
133     STREAM_LOCK (stream);
134     if (stream->queue.length > 0) {
135       ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq);
136     } else {
137       GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
138     }
139     STREAM_UNLOCK (stream);
140   }
141
142   return ret;
143 }
144
145 static void
146 rtp_storage_do_put_recovered_packet (RtpStorage * self,
147     GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
148 {
149   RtpStorageStream *stream;
150
151   STORAGE_LOCK (self);
152   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
153   STORAGE_UNLOCK (self);
154
155   g_assert (stream);
156
157   GST_LOG_OBJECT (self,
158       "Storing recovered RTP packet with ssrc=%08x pt=%u seq=%u %"
159       GST_PTR_FORMAT, ssrc, pt, seq, buffer);
160
161   STREAM_LOCK (stream);
162   rtp_storage_stream_add_item (stream, buffer, pt, seq);
163   STREAM_UNLOCK (stream);
164 }
165
166 void
167 rtp_storage_put_recovered_packet (RtpStorage * self,
168     GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
169 {
170   rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq);
171   g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer);
172 }
173
174 gboolean
175 rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf)
176 {
177   GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
178   RtpStorageStream *stream;
179   guint32 ssrc;
180   guint8 pt;
181   guint16 seq;
182
183   if (0 == self->size_time)
184     return TRUE;
185
186   /* We are about to save it in the queue, it so it is better take a ref before
187    * mapping the buffer */
188   gst_buffer_ref (buf);
189
190   if (!gst_rtp_buffer_map (buf, GST_MAP_READ |
191           GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) {
192     gst_buffer_unref (buf);
193     return TRUE;
194   }
195
196   ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
197   pt = gst_rtp_buffer_get_payload_type (&rtpbuf);
198   seq = gst_rtp_buffer_get_seq (&rtpbuf);
199
200   STORAGE_LOCK (self);
201
202   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
203   if (NULL == stream) {
204     GST_DEBUG_OBJECT (self,
205         "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt);
206     stream = rtp_storage_stream_new (ssrc);
207     g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream);
208   }
209
210   STORAGE_UNLOCK (self);
211
212   GST_LOG_OBJECT (self,
213       "Storing RTP packet with ssrc=%08x pt=%u seq=%u %" GST_PTR_FORMAT,
214       ssrc, pt, seq, buf);
215
216   STREAM_LOCK (stream);
217
218   /* Saving the buffer, now the storage owns it */
219   rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt,
220       seq);
221
222   STREAM_UNLOCK (stream);
223
224   gst_rtp_buffer_unmap (&rtpbuf);
225
226   if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) {
227     gst_buffer_unref (buf);
228     return FALSE;
229   }
230
231   return TRUE;
232 }
233
234 void
235 rtp_storage_clear (RtpStorage * self)
236 {
237   STORAGE_LOCK (self);
238   g_hash_table_remove_all (self->streams);
239   STORAGE_UNLOCK (self);
240 }
241
242 void
243 rtp_storage_set_size (RtpStorage * self, GstClockTime size)
244 {
245   self->size_time = size;
246   if (0 == self->size_time)
247     rtp_storage_clear (self);
248 }
249
250 GstClockTime
251 rtp_storage_get_size (RtpStorage * self)
252 {
253   return self->size_time;
254 }
255
256 RtpStorage *
257 rtp_storage_new (void)
258 {
259   return g_object_new (RTP_TYPE_STORAGE, NULL);
260 }