1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 * Author: Mikhail Fludkov <misha@pexip.com>
21 #include <gst/rtp/gstrtpbuffer.h>
23 #include "rtpstorage.h"
24 #include "rtpstoragestream.h"
26 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
30 SIGNAL_PACKET_RECOVERED,
34 static guint rtp_storage_signals[LAST_SIGNAL] = { 0 };
36 G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT);
38 #define STORAGE_LOCK(s) g_mutex_lock (&(s)->streams_lock)
39 #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock)
40 #define DEFAULT_SIZE_TIME (0)
43 rtp_storage_init (RtpStorage * self)
45 self->size_time = DEFAULT_SIZE_TIME;
46 self->streams = g_hash_table_new_full (NULL, NULL, NULL,
47 (GDestroyNotify) rtp_storage_stream_free);
48 g_mutex_init (&self->streams_lock);
52 rtp_storage_dispose (GObject * obj)
54 RtpStorage *self = RTP_STORAGE (obj);
56 g_hash_table_unref (self->streams);
58 STORAGE_UNLOCK (self);
59 g_mutex_clear (&self->streams_lock);
60 G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj);
64 rtp_storage_class_init (RtpStorageClass * klass)
66 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
68 rtp_storage_signals[SIGNAL_PACKET_RECOVERED] =
69 g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass),
70 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
72 gobject_class->dispose = rtp_storage_dispose;
76 rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt,
77 guint32 ssrc, guint16 lost_seq)
79 GstBufferList *ret = NULL;
80 RtpStorageStream *stream;
82 if (0 == self->size_time) {
83 GST_WARNING_OBJECT (self, "Received request for recovery RTP packets"
84 " around lost_seqnum=%u fec_pt=%u for ssrc=%08x, but size is 0",
85 lost_seq, fec_pt, ssrc);
90 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
91 STORAGE_UNLOCK (self);
94 GST_ERROR_OBJECT (self, "Can't find ssrc = 0x08%x", ssrc);
97 if (stream->queue.length > 0) {
98 GST_LOG_OBJECT (self, "Looking for recovery packets for fec_pt=%u around"
99 " lost_seq=%u for ssrc=%08x", fec_pt, lost_seq, ssrc);
101 rtp_storage_stream_get_packets_for_recovery (stream, fec_pt,
104 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
106 STREAM_UNLOCK (stream);
113 rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc,
116 GstBuffer *ret = NULL;
117 RtpStorageStream *stream;
119 if (0 == self->size_time) {
120 GST_WARNING_OBJECT (self, "Received request for redundant RTP packet with"
121 " seq=%u for ssrc=%08x, but size is 0", lost_seq, ssrc);
126 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
127 STORAGE_UNLOCK (self);
129 if (NULL == stream) {
130 GST_ERROR_OBJECT (self, "Can't find ssrc = 0x%x", ssrc);
132 STREAM_LOCK (stream);
133 if (stream->queue.length > 0) {
134 ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq);
136 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
138 STREAM_UNLOCK (stream);
145 rtp_storage_do_put_recovered_packet (RtpStorage * self,
146 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
148 RtpStorageStream *stream;
151 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
152 STORAGE_UNLOCK (self);
156 GST_LOG_OBJECT (self,
157 "Storing recovered RTP packet with ssrc=%08x pt=%u seq=%u %"
158 GST_PTR_FORMAT, ssrc, pt, seq, buffer);
160 STREAM_LOCK (stream);
161 rtp_storage_stream_add_item (stream, buffer, pt, seq);
162 STREAM_UNLOCK (stream);
166 rtp_storage_put_recovered_packet (RtpStorage * self,
167 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
169 rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq);
170 g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer);
174 rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf)
176 GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
177 RtpStorageStream *stream;
182 if (0 == self->size_time)
185 /* We are about to save it in the queue, it so it is better take a ref before
186 * mapping the buffer */
187 gst_buffer_ref (buf);
189 if (!gst_rtp_buffer_map (buf, GST_MAP_READ |
190 GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) {
191 gst_buffer_unref (buf);
195 ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
196 pt = gst_rtp_buffer_get_payload_type (&rtpbuf);
197 seq = gst_rtp_buffer_get_seq (&rtpbuf);
201 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
202 if (NULL == stream) {
203 GST_DEBUG_OBJECT (self,
204 "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt);
205 stream = rtp_storage_stream_new (ssrc);
206 g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream);
209 STORAGE_UNLOCK (self);
211 GST_LOG_OBJECT (self,
212 "Storing RTP packet with ssrc=%08x pt=%u seq=%u %" GST_PTR_FORMAT,
215 STREAM_LOCK (stream);
217 /* Saving the buffer, now the storage owns it */
218 rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt,
221 STREAM_UNLOCK (stream);
223 gst_rtp_buffer_unmap (&rtpbuf);
225 if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) {
226 gst_buffer_unref (buf);
234 rtp_storage_clear (RtpStorage * self)
237 g_hash_table_remove_all (self->streams);
238 STORAGE_UNLOCK (self);
242 rtp_storage_set_size (RtpStorage * self, GstClockTime size)
244 self->size_time = size;
245 if (0 == self->size_time)
246 rtp_storage_clear (self);
250 rtp_storage_get_size (RtpStorage * self)
252 return self->size_time;
256 rtp_storage_new (void)
258 return g_object_new (RTP_TYPE_STORAGE, NULL);