#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock))
#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+#define GST_PAD_FLAG_STICKIES_SENT (GST_PAD_FLAG_LAST << 0)
+#define GST_PAD_STICKIES_SENT(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
+#define GST_PAD_SET_STICKIES_SENT(pad) (GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
+
typedef enum
{
RTP_PAD,
GstEvent *newevent;
newevent = add_ssrc_and_ref (*event, data->ssrc);
-
gst_pad_push_event (data->pad, newevent);
return TRUE;
}
-/* With internal stream lock held */
static void
forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
PadType padtype)
dpads->rtp_pad = rtp_pad;
dpads->rtcp_pad = rtcp_pad;
- gst_pad_set_element_private (rtp_pad, dpads);
- gst_pad_set_element_private (rtcp_pad, dpads);
-
GST_OBJECT_LOCK (demux);
demux->srcpads = g_slist_prepend (demux->srcpads, dpads);
GST_OBJECT_UNLOCK (demux);
gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_active (rtcp_pad, TRUE);
- forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD);
- forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD);
-
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
GSList *walk = NULL;
GstEvent *newevent = NULL;
+ /* special case for EOS */
+ if (GST_EVENT_TYPE (fdata->event) == GST_EVENT_EOS)
+ GST_PAD_SET_STICKIES_SENT (pad);
+
+ if (GST_EVENT_IS_STICKY (fdata->event) && !GST_PAD_STICKIES_SENT (pad))
+ return FALSE;
+
GST_OBJECT_LOCK (fdata->demux);
for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
GstRtpSsrcDemuxPads *dpads = (GstRtpSsrcDemuxPads *) walk->data;
if (srcpad == NULL)
goto create_failed;
+ if (!GST_PAD_STICKIES_SENT (srcpad)) {
+ forward_initial_events (demux, ssrc, srcpad, RTP_PAD);
+ GST_PAD_SET_STICKIES_SENT (srcpad);
+ }
+
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
if (srcpad == NULL)
goto create_failed;
+ if (!GST_PAD_STICKIES_SENT (srcpad)) {
+ forward_initial_events (demux, ssrc, srcpad, RTCP_PAD);
+ GST_PAD_SET_STICKIES_SENT (srcpad);
+ }
+
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
gboolean live;
GstClockTime min_latency, max_latency;
- GstRtpSsrcDemuxPads *dpads;
-
- dpads = gst_pad_get_element_private (pad);
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
- GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
+ GST_DEBUG_OBJECT (pad, "peer min latency %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
- GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", dpads->ssrc);
-
gst_query_set_latency (query, live, min_latency, max_latency);
}
break;
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
#include <gst/check/gstcheck.h>
#include <gst/check/gstharness.h>
+#ifdef HAVE_VALGRIND
+# include <valgrind/valgrind.h>
+#else
+# define RUNNING_ON_VALGRIND 0
+#endif
+
#define TEST_BUF_CLOCK_RATE 8000
#define TEST_BUF_PT 0
#define TEST_BUF_SSRC 0x01BADBAD
return buf;
}
+
typedef struct
{
GstHarness *rtp_sink;
gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ());
g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0);
- g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2);
-
- event = gst_harness_pull_event (ctx.rtcp_src);
- g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START);
- gst_event_unref (event);
+ g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 1);
event = gst_harness_pull_event (ctx.rtcp_src);
g_assert_cmpint (event->type, ==, GST_EVENT_EOS);
GST_END_TEST;
static void
-new_rtcp_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc,
+new_rtcp_ssrc_pad_found (GstElement * element, guint ssrc,
G_GNUC_UNUSED GstPad * rtp_pad, GSList ** src_h)
{
GstHarness *h;
GST_END_TEST;
+static GstBuffer *
+generate_rtcp_sr_buffer (guint ssrc)
+{
+ GstBuffer *buf;
+ GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+ GstRTCPPacket packet;
+
+ buf = gst_rtcp_buffer_new (1000);
+ fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp));
+ fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_SR, &packet));
+ gst_rtcp_packet_sr_set_sender_info (&packet, ssrc, 0, 0, 1, 1);
+ gst_rtcp_buffer_unmap (&rtcp);
+ return buf;
+}
+
+typedef struct
+{
+ GstHarness *rtp_h;
+ GstHarness *rtcp_h;
+} SimulCtx;
+
+static void
+_simul_ctx_new_ssrc_pad_cb (GstElement * element, guint ssrc,
+ GstPad * rtp_pad, SimulCtx * ctx)
+{
+ GstPad *rtcp_pad;
+ gchar *name;
+
+ gst_harness_add_element_src_pad (ctx->rtp_h, rtp_pad);
+
+ name = g_strdup_printf ("rtcp_src_%u", ssrc);
+ rtcp_pad = gst_element_get_static_pad (element, name);
+ gst_harness_add_element_src_pad (ctx->rtcp_h, rtcp_pad);
+ gst_object_unref (rtcp_pad);
+ g_free (name);
+}
+
+static gpointer
+_simul_ctx_push_rtp_buffers (gpointer user_data)
+{
+ SimulCtx *ctx = user_data;
+
+ gst_harness_set_src_caps_str (ctx->rtp_h, "application/x-rtp");
+ gst_harness_push (ctx->rtp_h, create_buffer (0, 1111));
+ return NULL;
+}
+
+static gpointer
+_simul_ctx_push_rtcp_buffers (gpointer user_data)
+{
+ SimulCtx *ctx = user_data;
+
+ g_usleep (10);
+ gst_harness_set_src_caps_str (ctx->rtcp_h, "application/x-rtcp");
+ gst_harness_push (ctx->rtcp_h, generate_rtcp_sr_buffer (1111));
+ return NULL;
+}
+
+GST_START_TEST (test_rtp_and_rtcp_arrives_simultaneously)
+{
+ guint r;
+ guint repeats = 1000;
+ if (RUNNING_ON_VALGRIND)
+ repeats = 2;
+
+ for (r = 0; r < repeats; r++) {
+ SimulCtx ctx;
+ GThread *t0, *t1;
+
+ ctx.rtp_h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
+ ctx.rtcp_h =
+ gst_harness_new_with_element (ctx.rtp_h->element, "rtcp_sink", NULL);
+
+ g_signal_connect (ctx.rtp_h->element,
+ "new-ssrc-pad", (GCallback) _simul_ctx_new_ssrc_pad_cb, &ctx);
+ t0 = g_thread_new ("push rtp", _simul_ctx_push_rtp_buffers, &ctx);
+ t1 = g_thread_new ("push rtcp", _simul_ctx_push_rtcp_buffers, &ctx);
+
+ g_thread_join (t0);
+ g_thread_join (t1);
+
+ gst_harness_teardown (ctx.rtp_h);
+ gst_harness_teardown (ctx.rtcp_h);
+ }
+}
+
+GST_END_TEST;
static Suite *
rtpssrcdemux_suite (void)
tcase_add_test (tc_chain, test_rtpssrcdemux_rtcp_app);
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtp);
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtcp);
+ tcase_add_test (tc_chain, test_rtp_and_rtcp_arrives_simultaneously);
return s;
}