{
PROP_0,
PROP_PT,
- PROP_RECEIVED
+ PROP_RECEIVED,
+ PROP_PAYLOADS,
};
static RTPHistItem *
}
static void
-gst_rtp_red_history_update (GstRtpRedDec * self, GstRTPBuffer * rtp)
+gst_rtp_red_history_update (GstRtpRedDec * self, GQueue * rtp_history,
+ GstRTPBuffer * rtp)
{
RTPHistItem *item;
GList *link, *sibling;
* allocate a new link and a new item,
* otherwise reuse the tail (the oldest data) without any reallocations
*/
- if (self->rtp_history->length < RTP_HISTORY_MAX_SIZE) {
+ if (rtp_history->length < RTP_HISTORY_MAX_SIZE) {
item = rtp_hist_item_alloc ();
link = g_list_alloc ();
link->data = item;
} else {
- link = g_queue_pop_tail_link (self->rtp_history);
+ link = g_queue_pop_tail_link (rtp_history);
item = link->data;
}
/* Looking for a place to insert new link.
* The queue has newest to oldest rtp timestamps, so in 99% cases
* it is inserted before the head of the queue */
- sibling = g_list_find_custom (self->rtp_history->head,
+ sibling = g_list_find_custom (rtp_history->head,
GUINT_TO_POINTER (item->timestamp),
gst_rtp_red_history_find_less_or_equal);
- g_queue_push_nth_link (self->rtp_history,
- g_list_position (self->rtp_history->head, sibling), link);
+ g_queue_push_nth_link (rtp_history,
+ g_list_position (rtp_history->head, sibling), link);
}
static gboolean
static gboolean
gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self,
- guint32 timestamp, guint16 * dst_seq_num)
+ GQueue * rtp_history, guint32 timestamp, guint16 * dst_seq_num)
{
- GList *older_sibling = g_list_find_custom (self->rtp_history->head,
+ GList *older_sibling = g_list_find_custom (rtp_history->head,
GUINT_TO_POINTER (timestamp),
gst_rtp_red_history_find_less);
RTPHistItem *older;
gint seq_diff, lost_packet_idx;
if (NULL == older_sibling) {
- if (self->rtp_history->length == RTP_HISTORY_MAX_SIZE)
+ if (rtp_history->length == RTP_HISTORY_MAX_SIZE)
GST_WARNING_OBJECT (self, "History is too short. "
"Oldest rtp timestamp %u, looking for %u, size %u",
- RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->tail->data),
- timestamp, self->rtp_history->length);
+ RTP_HIST_ITEM_TIMESTAMP (rtp_history->tail->data),
+ timestamp, rtp_history->length);
return FALSE;
}
if (NULL == older_sibling->prev) {
GST_WARNING_OBJECT (self, "RED block timestamp offset probably wrong. "
"Latest rtp timestamp %u, looking for %u, size %u",
- RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->head->data),
- timestamp, self->rtp_history->length);
+ RTP_HIST_ITEM_TIMESTAMP (rtp_history->head->data),
+ timestamp, rtp_history->length);
return FALSE;
}
static GstBuffer *
gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self,
- GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset)
+ GQueue * rtp_history, GstRTPBuffer * red_rtp, gsize * red_hdr_offset,
+ gsize * red_payload_offset)
{
guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
guint8 *red_hdr = payload + *red_hdr_offset;
GstBuffer *ret = NULL;
guint16 lost_seq = 0;
- if (gst_red_history_lost_seq_num_for_timestamp (self, lost_timestamp,
- &lost_seq)) {
- GST_LOG_OBJECT (self, "Recovering from RED packet pt=%u ts=%u seq=%u"
- " len=%u present", rtp_red_block_get_payload_type (red_hdr),
- lost_timestamp, lost_seq, rtp_red_block_get_payload_length (red_hdr));
+ if (gst_red_history_lost_seq_num_for_timestamp (self, rtp_history,
+ lost_timestamp, &lost_seq)) {
+ GST_LOG_OBJECT (self,
+ "Recovering from RED packet pt=%u ts=%u seq=%u" " len=%u present",
+ rtp_red_block_get_payload_type (red_hdr), lost_timestamp, lost_seq,
+ rtp_red_block_get_payload_length (red_hdr));
ret =
gst_rtp_red_create_packet (self, red_rtp, FALSE,
rtp_red_block_get_payload_type (red_hdr), lost_seq, lost_timestamp,
}
static GstBuffer *
-gst_rtp_red_create_from_block (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
- gsize * red_hdr_offset, gsize * red_payload_offset)
+gst_rtp_red_create_from_block (GstRtpRedDec * self, GQueue * rtp_history,
+ GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset)
{
guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
if (rtp_red_block_is_redundant (payload + (*red_hdr_offset)))
- return gst_rtp_red_create_from_redundant_block (self, red_rtp,
+ return gst_rtp_red_create_from_redundant_block (self, rtp_history, red_rtp,
red_hdr_offset, red_payload_offset);
return gst_rtp_red_create_from_main_block (self, red_rtp, *red_hdr_offset,
}
static GstFlowReturn
-gst_rtp_red_process (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
- gsize first_red_payload_offset)
+gst_rtp_red_process (GstRtpRedDec * self, GQueue * rtp_history,
+ GstRTPBuffer * red_rtp, gsize first_red_payload_offset)
{
gsize red_hdr_offset = 0;
gsize red_payload_offset = first_red_payload_offset;
GstFlowReturn ret = GST_FLOW_OK;
do {
- GstBuffer *buf =
- gst_rtp_red_create_from_block (self, red_rtp, &red_hdr_offset,
+ GstBuffer *buf = gst_rtp_red_create_from_block (self, rtp_history, red_rtp,
+ &red_hdr_offset,
&red_payload_offset);
if (buf)
ret = gst_pad_push (self->srcpad, buf);
return ret;
}
+static gboolean
+is_red_pt (GstRtpRedDec * self, guint8 pt)
+{
+ gboolean ret;
+
+ g_mutex_lock (&self->lock);
+ if (pt == self->pt) {
+ ret = TRUE;
+ goto done;
+ }
+
+ ret = self->payloads
+ && g_hash_table_contains (self->payloads, GINT_TO_POINTER (pt));
+
+done:
+ g_mutex_unlock (&self->lock);
+ return ret;
+}
+
static GstFlowReturn
gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstRTPBuffer irtp = GST_RTP_BUFFER_INIT;
GstFlowReturn ret = GST_FLOW_OK;
gsize first_red_payload_offset = 0;
+ GQueue *rtp_history;
+ guint32 ssrc;
- if (self->pt == UNDEF_PT)
+ if (self->pt == UNDEF_PT && self->payloads == NULL)
return gst_pad_push (self->srcpad, buffer);
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &irtp))
return gst_pad_push (self->srcpad, buffer);
- gst_rtp_red_history_update (self, &irtp);
+ ssrc = gst_rtp_buffer_get_ssrc (&irtp);
+
+ if (!(rtp_history =
+ g_hash_table_lookup (self->rtp_histories, GUINT_TO_POINTER (ssrc)))) {
+ rtp_history = g_queue_new ();
+ g_hash_table_insert (self->rtp_histories, GUINT_TO_POINTER (ssrc),
+ rtp_history);
+ }
+
+ gst_rtp_red_history_update (self, rtp_history, &irtp);
- if (self->pt != gst_rtp_buffer_get_payload_type (&irtp)) {
+ if (!is_red_pt (self, gst_rtp_buffer_get_payload_type (&irtp))) {
GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &irtp);
gst_rtp_buffer_unmap (&irtp);
if (rtp_red_buffer_is_valid (self, &irtp, &first_red_payload_offset)) {
GST_DEBUG_RTP_PACKET (self, "rtp header (red)", &irtp);
- ret = gst_rtp_red_process (self, &irtp, first_red_payload_offset);
+ ret =
+ gst_rtp_red_process (self, rtp_history, &irtp,
+ first_red_payload_offset);
}
gst_rtp_buffer_unmap (&irtp);
{
GstRtpRedDec *self = GST_RTP_RED_DEC (obj);
- g_queue_free_full (self->rtp_history, rtp_hist_item_free);
+ g_hash_table_unref (self->rtp_histories);
+
+ if (self->payloads) {
+ g_hash_table_unref (self->payloads);
+ }
+
+ g_mutex_clear (&self->lock);
G_OBJECT_CLASS (gst_rtp_red_dec_parent_class)->dispose (obj);
}
static void
+free_rtp_history (GQueue * rtp_history)
+{
+ g_queue_free_full (rtp_history, rtp_hist_item_free);
+}
+
+static void
gst_rtp_red_dec_init (GstRtpRedDec * self)
{
GstPadTemplate *pad_template;
self->pt = DEFAULT_PT;
self->num_received = 0;
- self->rtp_history = g_queue_new ();
+ self->rtp_histories =
+ g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+ (GDestroyNotify) free_rtp_history);
+ self->payloads = NULL;
+ g_mutex_init (&self->lock);
}
-
static void
gst_rtp_red_dec_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
switch (prop_id) {
case PROP_PT:
+ g_mutex_lock (&self->lock);
self->pt = g_value_get_int (value);
+ g_mutex_unlock (&self->lock);
+ break;
+ case PROP_PAYLOADS:
+ {
+ guint i, n_vals;
+
+ g_mutex_lock (&self->lock);
+ if (self->payloads) {
+ g_hash_table_unref (self->payloads);
+ self->payloads = NULL;
+ }
+
+ n_vals = gst_value_array_get_size (value);
+
+ if (n_vals > 0) {
+ self->payloads = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+ for (i = 0; i < gst_value_array_get_size (value); i++) {
+ const GValue *val = gst_value_array_get_value (value, i);
+
+ g_hash_table_insert (self->payloads,
+ GINT_TO_POINTER (g_value_get_int (val)), NULL);
+ }
+ }
+ g_mutex_unlock (&self->lock);
break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
static void
+append_payload (gpointer key, gpointer value, GValue * array)
+{
+ GValue v = { 0, };
+ g_value_init (&v, G_TYPE_INT);
+ g_value_set_int (&v, GPOINTER_TO_INT (key));
+ gst_value_array_append_value (array, &v);
+ g_value_unset (&v);
+}
+
+static void
gst_rtp_red_dec_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstRtpRedDec *self = GST_RTP_RED_DEC (object);
switch (prop_id) {
case PROP_PT:
+ g_mutex_lock (&self->lock);
g_value_set_int (value, self->pt);
+ g_mutex_unlock (&self->lock);
break;
case PROP_RECEIVED:
g_value_set_uint (value, self->num_received);
break;
+ case PROP_PAYLOADS:
+ {
+ g_mutex_lock (&self->lock);
+ if (self->payloads) {
+ g_hash_table_foreach (self->payloads, (GHFunc) append_payload, value);
+ }
+ g_mutex_unlock (&self->lock);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
"Count of received packets",
0, G_MAXUINT32, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ /**
+ * rtpreddec:payloads:
+ *
+ * All the RED payloads this decoder may encounter
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (G_OBJECT_CLASS (klass),
+ PROP_PAYLOADS,
+ gst_param_spec_array ("payloads",
+ "RED payloads",
+ "All the RED payloads this decoder may encounter",
+ g_param_spec_int ("pt",
+ "payload type",
+ "A RED payload type",
+ MIN_PT, MAX_PT,
+ DEFAULT_PT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS),
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)
+ );
+
GST_DEBUG_CATEGORY_INIT (gst_rtp_red_dec_debug, "rtpreddec", 0,
"RTP RED Decoder");
}