GST_END_TEST;
+typedef struct
+{
+ gint ready;
+ GMutex mutex;
+ GCond cond;
+} LockTestContext;
+
+static void
+new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad,
+ LockTestContext * ctx)
+{
+ g_message ("Signalling ready");
+ g_atomic_int_set (&ctx->ready, 1);
+
+ g_message ("Waiting no more ready");
+ while (g_atomic_int_get (&ctx->ready))
+ g_usleep (G_USEC_PER_SEC / 100);
+
+ g_mutex_lock (&ctx->mutex);
+ g_mutex_unlock (&ctx->mutex);
+}
+
+static gpointer
+push_buffer_func (gpointer user_data)
+{
+ GstHarness *h = user_data;
+ gst_harness_push (h, create_buffer (0, 0xdeadbeef));
+ return NULL;
+}
+
+GST_START_TEST (test_oob_event_locking)
+{
+ GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
+ LockTestContext ctx = { FALSE, };
+ GThread *thread;
+
+ g_mutex_init (&ctx.mutex);
+ g_cond_init (&ctx.cond);
+
+ gst_harness_set_src_caps_str (h, "application/x-rtp");
+ g_signal_connect (h->element,
+ "new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx);
+
+ thread = g_thread_new ("streaming-thread", push_buffer_func, h);
+
+ g_mutex_lock (&ctx.mutex);
+
+ g_message ("Waiting for ready");
+ while (!g_atomic_int_get (&ctx.ready))
+ g_usleep (G_USEC_PER_SEC / 100);
+ g_message ("Signal no more ready");
+ g_atomic_int_set (&ctx.ready, 0);
+
+ gst_harness_push_event (h,
+ gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL));
+
+ g_mutex_unlock (&ctx.mutex);
+
+ g_thread_join (thread);
+ g_mutex_clear (&ctx.mutex);
+ g_cond_clear (&ctx.cond);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
static Suite *
rtpssrcdemux_suite (void)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_event_forwarding);
+ tcase_add_test (tc_chain, test_oob_event_locking);
return s;
}