GstSrtpCipherType rtcp_cipher;
GstSrtpAuthType rtcp_auth;
GArray *keys;
+ guint recv_count;
+ guint recv_drop_count;
};
#ifdef HAVE_SRTP2
if (filter->session) {
GHashTableIter iter;
- gpointer key;
+ gpointer key, value;
g_hash_table_iter_init (&iter, filter->streams);
- while (g_hash_table_iter_next (&iter, &key, NULL)) {
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ GstSrtpDecSsrcStream *stream = value;
GstStructure *ss;
guint32 ssrc = GPOINTER_TO_UINT (key);
srtp_err_status_t status;
}
ss = gst_structure_new ("application/x-srtp-stream",
- "ssrc", G_TYPE_UINT, ssrc, "roc", G_TYPE_UINT, roc, NULL);
+ "ssrc", G_TYPE_UINT, ssrc, "roc", G_TYPE_UINT, roc, "recv-count",
+ G_TYPE_UINT, stream->recv_count, "recv-drop-count", G_TYPE_UINT,
+ stream->recv_drop_count, NULL);
g_value_take_boxed (&v, ss);
gst_value_array_append_value (&va, &v);
}
gst_structure_take_value (s, "streams", &va);
+ gst_structure_set (s, "recv-count", G_TYPE_UINT, filter->recv_count, NULL);
+ gst_structure_set (s, "recv-drop-count", G_TYPE_UINT,
+ filter->recv_drop_count, NULL);
+ GST_LOG_OBJECT (filter, "stats: recv-count %u recv-drop-count %u",
+ filter->recv_count, filter->recv_drop_count);
g_value_unset (&v);
return s;
goto error;
}
- if (gst_structure_get (s, "srtp-key", GST_TYPE_BUFFER, &buf, NULL) || !buf) {
+ if (gst_structure_get (s, "srtp-key", GST_TYPE_BUFFER, &buf, NULL) && buf) {
#ifdef HAVE_SRTP2
GstBuffer *mki = NULL;
guint i;
return gst_srtp_dec_iterate_internal_links (pad, parent, TRUE);
}
-static void
+/* Partial backport to 1.22 of `gst_element_decorate_stream_id_internal`,
+ * which was introduced in 1.23 */
+static gchar *
+decorate_stream_id_private (GstElement * element, const gchar * stream_id)
+{
+ gchar *upstream_stream_id = NULL, *new_stream_id;
+ GstQuery *query;
+ gchar *uri = NULL;
+
+ /* Try to generate a stream-id from the URI query and
+ * if it fails take a random number instead */
+ query = gst_query_new_uri ();
+ if (gst_element_query (element, query)) {
+ gst_query_parse_uri (query, &uri);
+ }
+
+ if (uri) {
+ GChecksum *cs;
+
+ /* And then generate an SHA256 sum of the URI */
+ cs = g_checksum_new (G_CHECKSUM_SHA256);
+ g_checksum_update (cs, (const guchar *) uri, strlen (uri));
+ g_free (uri);
+ upstream_stream_id = g_strdup (g_checksum_get_string (cs));
+ g_checksum_free (cs);
+ } else {
+ /* Just get some random number if the URI query fails */
+ GST_FIXME_OBJECT (element, "Creating random stream-id, consider "
+ "implementing a deterministic way of creating a stream-id");
+ upstream_stream_id =
+ g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
+ g_random_int (), g_random_int ());
+ }
+
+ gst_query_unref (query);
+
+ if (stream_id) {
+ new_stream_id = g_strconcat (upstream_stream_id, "/", stream_id, NULL);
+ } else {
+ new_stream_id = g_strdup (upstream_stream_id);
+ }
+
+ g_free (upstream_stream_id);
+
+ return new_stream_id;
+}
+
+static gboolean
gst_srtp_dec_push_early_events (GstSrtpDec * filter, GstPad * pad,
GstPad * otherpad, gboolean is_rtcp)
{
is_rtcp ? "rtcp" : "rtp");
gst_event_unref (otherev);
} else {
- new_stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT (filter),
+ new_stream_id = decorate_stream_id_private (GST_ELEMENT (filter),
is_rtcp ? "rtcp" : "rtp");
}
else
caps = gst_caps_new_empty_simple ("application/x-rtp");
- gst_pad_set_caps (pad, caps);
+ ev = gst_event_new_caps (caps);
+ gst_pad_push_event (pad, ev);
gst_caps_unref (caps);
}
} else {
ev = gst_pad_get_sticky_event (otherpad, GST_EVENT_SEGMENT, 0);
- if (ev)
+ if (ev) {
gst_pad_push_event (pad, ev);
+ } else if (GST_PAD_IS_FLUSHING (otherpad)) {
+ /* We didn't get a Segment event from otherpad
+ * and otherpad is flushing => we are most likely shutting down */
+ goto err;
+ } else {
+ GST_WARNING_OBJECT (filter, "No Segment event to push");
+ goto err;
+ }
}
if (is_rtcp)
else
filter->rtp_has_segment = TRUE;
+ return TRUE;
+
+err:
+ return FALSE;
}
/*
GstMapInfo map;
srtp_err_status_t err;
gint size;
+ GstSrtpDecSsrcStream *stream;
GST_LOG_OBJECT (pad, "Received %s buffer of size %" G_GSIZE_FORMAT
" with SSRC = %u", is_rtcp ? "RTCP" : "RTP", gst_buffer_get_size (buf),
ssrc);
-
+ filter->recv_count++;
/* Change buffer to remove protection */
buf = gst_buffer_make_writable (buf);
if (is_rtcp) {
#ifdef HAVE_SRTP2
- GstSrtpDecSsrcStream *stream = find_stream_by_ssrc (filter, ssrc);
+ stream = find_stream_by_ssrc (filter, ssrc);
err = srtp_unprotect_rtcp_mki (filter->session, map.data, &size,
stream && stream->keys);
#ifdef HAVE_SRTP2
{
- GstSrtpDecSsrcStream *stream = find_stream_by_ssrc (filter, ssrc);
+ stream = find_stream_by_ssrc (filter, ssrc);
err = srtp_unprotect_mki (filter->session, map.data, &size,
stream && stream->keys);
err = srtp_unprotect (filter->session, map.data, &size);
#endif
}
-
+ stream = find_stream_by_ssrc (filter, ssrc);
+ if (stream == NULL) {
+ GST_WARNING_OBJECT (filter, "Could not find matching stream, dropping");
+ goto err;
+ }
+ stream->recv_count++;
/* Signal user depending on type of error */
switch (err) {
case srtp_err_status_ok:
case srtp_err_status_replay_fail:
GST_DEBUG_OBJECT (filter,
"Dropping replayed packet, probably retransmission");
+ stream->recv_drop_count++;
goto err;
case srtp_err_status_replay_old:
GST_DEBUG_OBJECT (filter,
"Dropping replayed old packet, probably retransmission");
+ stream->recv_drop_count++;
goto err;
case srtp_err_status_key_expired:{
- GstSrtpDecSsrcStream *stream;
-
- /* Check we have an existing stream to rekey */
- stream = find_stream_by_ssrc (filter, ssrc);
- if (stream == NULL) {
- GST_WARNING_OBJECT (filter, "Could not find matching stream, dropping");
- goto err;
- }
GST_OBJECT_UNLOCK (filter);
stream = request_key_with_signal (filter, ssrc, SIGNAL_HARD_LIMIT);
}
case srtp_err_status_auth_fail:
GST_WARNING_OBJECT (filter, "Error authentication packet, dropping");
+ stream->recv_drop_count++;
goto err;
case srtp_err_status_cipher_fail:
GST_WARNING_OBJECT (filter, "Error while decrypting packet, dropping");
+ stream->recv_drop_count++;
goto err;
default:
GST_WARNING_OBJECT (pad,
"Unable to unprotect buffer (unprotect failed code %d)", err);
+ stream->recv_drop_count++;
goto err;
}
-
gst_buffer_unmap (buf, &map);
gst_buffer_set_size (buf, size);
return TRUE;
err:
+ filter->recv_drop_count++;
gst_buffer_unmap (buf, &map);
return FALSE;
}
/* Push buffer to source pad */
if (is_rtcp) {
otherpad = filter->rtcp_srcpad;
- if (!filter->rtcp_has_segment)
- gst_srtp_dec_push_early_events (filter, filter->rtcp_srcpad,
- filter->rtp_srcpad, TRUE);
+ if (!filter->rtcp_has_segment) {
+ if (!gst_srtp_dec_push_early_events (filter, filter->rtcp_srcpad,
+ filter->rtp_srcpad, TRUE)) {
+ ret = GST_FLOW_FLUSHING;
+ goto drop_buffer;
+ }
+ }
} else {
otherpad = filter->rtp_srcpad;
- if (!filter->rtp_has_segment)
- gst_srtp_dec_push_early_events (filter, filter->rtp_srcpad,
- filter->rtcp_srcpad, FALSE);
+ if (!filter->rtp_has_segment) {
+ if (!gst_srtp_dec_push_early_events (filter, filter->rtp_srcpad,
+ filter->rtcp_srcpad, FALSE)) {
+ ret = GST_FLOW_FLUSHING;
+ goto drop_buffer;
+ }
+ }
}
+
ret = gst_pad_push (otherpad, buf);
return ret;
filter->rtp_has_segment = FALSE;
filter->rtcp_has_segment = FALSE;
+ filter->recv_count = 0;
+ filter->recv_drop_count = 0;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
gst_srtp_dec_clear_streams (filter);
g_hash_table_unref (filter->streams);
filter->streams = NULL;
-
#ifndef HAVE_SRTP2
g_hash_table_unref (filter->streams_roc_changed);
filter->streams_roc_changed = NULL;