GST_END_TEST;
+static void
+crank_rtcp_thread (TestData * data, GstClockTime * time, GstClockID * id)
+{
+ gint queue_length;
+
+ queue_length = g_async_queue_length (data->rtcp_queue);
+ do {
+ *time = gst_clock_id_get_time (*id);
+ GST_DEBUG ("Advancing time to %" GST_TIME_FORMAT, GST_TIME_ARGS (*time));
+ if (*time > gst_clock_get_time (data->clock))
+ gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), *time);
+ fail_unless_equals_pointer (gst_test_clock_process_next_clock_id
+ (GST_TEST_CLOCK (data->clock)), *id);
+
+ /* wait for the RTCP pad thread to output its data
+ * and start waiting on the next timeout */
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data->clock), id);
+
+ /* and retry as long as there are no new RTCP packets out,
+ * because the RTCP thread may randomly decide to reschedule
+ * the RTCP timeout for later */
+ } while (g_async_queue_length (data->rtcp_queue) == queue_length);
+}
+
+GST_START_TEST (test_internal_sources_timeout)
+{
+ TestData data;
+ GstClockID id;
+ GstClockTime time;
+ GObject *internal_session;
+ guint internal_ssrc;
+ guint32 ssrc;
+ GstBuffer *buf;
+ GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+ GstRTCPPacket rtcp_packet;
+ GstFlowReturn res;
+ gint i, j;
+
+ setup_testharness (&data, TRUE);
+ g_object_get (data.session, "internal-session", &internal_session, NULL);
+ g_object_set (internal_session, "internal-ssrc", 0xDEADBEEF, NULL);
+
+ /* only the RTCP thread waits on the clock */
+ gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
+
+ /* crank the RTCP pad thread until it creates a RR for its internal-ssrc
+ * source, since we have not pushed any RTP packets and it doesn't have
+ * any other source available */
+ crank_rtcp_thread (&data, &time, &id);
+
+ g_object_get (internal_session, "internal-ssrc", &internal_ssrc, NULL);
+ g_assert_cmpint (internal_ssrc, ==, 0xDEADBEEF);
+
+ /* verify that rtpsession has sent RR for an internally-created
+ * RTPSource that is using the internal-ssrc */
+ buf = g_async_queue_pop (data.rtcp_queue);
+ g_assert (buf != NULL);
+ g_assert (gst_rtcp_buffer_validate (buf));
+ gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
+ g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
+ g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
+ GST_RTCP_TYPE_RR);
+ ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
+ g_assert_cmpint (ssrc, ==, internal_ssrc);
+ gst_rtcp_buffer_unmap (&rtcp);
+ gst_buffer_unref (buf);
+
+ /* ok, now let's push some RTP packets */
+ for (i = 1; i < 4; i++) {
+ gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock),
+ 200 * GST_MSECOND);
+ buf =
+ generate_test_buffer (time + i * 200 * GST_MSECOND, FALSE, i, i * 200,
+ 0x01BADBAD);
+ res = gst_pad_push (data.src, buf);
+ fail_unless (res == GST_FLOW_OK || res == GST_FLOW_FLUSHING);
+ }
+
+ /* internal ssrc must have changed already */
+ g_object_get (internal_session, "internal-ssrc", &internal_ssrc, NULL);
+ g_assert_cmpint (ssrc, !=, internal_ssrc);
+ g_assert_cmpint (internal_ssrc, ==, 0x01BADBAD);
+
+ /* wait for SR */
+ crank_rtcp_thread (&data, &time, &id);
+
+ /* verify SR and RR */
+ j = 0;
+ for (i = 0; i < 2; i++) {
+ buf = g_async_queue_pop (data.rtcp_queue);
+ g_assert (buf != NULL);
+ g_assert (gst_rtcp_buffer_validate (buf));
+ gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
+ g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
+ if (gst_rtcp_packet_get_type (&rtcp_packet) == GST_RTCP_TYPE_SR) {
+ gst_rtcp_packet_sr_get_sender_info (&rtcp_packet, &ssrc, NULL, NULL, NULL,
+ NULL);
+ g_assert_cmpint (ssrc, ==, internal_ssrc);
+ g_assert_cmpint (ssrc, ==, 0x01BADBAD);
+ j |= 0x1;
+ } else if (gst_rtcp_packet_get_type (&rtcp_packet) == GST_RTCP_TYPE_RR) {
+ ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
+ g_assert_cmpint (ssrc, !=, internal_ssrc);
+ g_assert_cmpint (ssrc, ==, 0xDEADBEEF);
+ j |= 0x2;
+ }
+ gst_rtcp_buffer_unmap (&rtcp);
+ gst_buffer_unref (buf);
+ }
+ g_assert_cmpint (j, ==, 0x3); /* verify we got both SR and RR */
+
+ /* go 30 seconds in the future and observe both sources timing out:
+ * 0xDEADBEEF -> BYE, 0x01BADBAD -> becomes receiver only */
+ gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), 30 * GST_SECOND);
+ crank_rtcp_thread (&data, &time, &id);
+
+ /* verify BYE and RR */
+ j = 0;
+ for (i = 0; i < 2; i++) {
+ buf = g_async_queue_pop (data.rtcp_queue);
+ g_assert (buf != NULL);
+ g_assert (gst_rtcp_buffer_validate (buf));
+ gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
+
+ g_assert (gst_rtcp_buffer_get_first_packet (&rtcp, &rtcp_packet));
+ g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
+ GST_RTCP_TYPE_RR);
+ ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcp_packet);
+ if (ssrc == 0x01BADBAD) {
+ j |= 0x1;
+ g_assert_cmpint (ssrc, ==, internal_ssrc);
+ /* 2 => RR, SDES. There is no BYE here */
+ g_assert_cmpint (gst_rtcp_buffer_get_packet_count (&rtcp), ==, 2);
+ } else if (ssrc == 0xDEADBEEF) {
+ j |= 0x2;
+ g_assert_cmpint (ssrc, !=, internal_ssrc);
+ /* 3 => RR, SDES, BYE */
+ g_assert_cmpint (gst_rtcp_buffer_get_packet_count (&rtcp), ==, 3);
+ g_assert (gst_rtcp_packet_move_to_next (&rtcp_packet));
+ g_assert (gst_rtcp_packet_move_to_next (&rtcp_packet));
+ g_assert_cmpint (gst_rtcp_packet_get_type (&rtcp_packet), ==,
+ GST_RTCP_TYPE_BYE);
+ }
+
+ gst_rtcp_buffer_unmap (&rtcp);
+ gst_buffer_unref (buf);
+ }
+ g_assert_cmpint (j, ==, 0x3); /* verify we got both BYE and RR */
+
+ g_object_unref (internal_session);
+ destroy_testharness (&data);
+}
+
+GST_END_TEST;
+
static Suite *
gstrtpsession_suite (void)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_multiple_ssrc_rr);
tcase_add_test (tc_chain, test_multiple_senders_roundrobin_rbs);
+ tcase_add_test (tc_chain, test_internal_sources_timeout);
return s;
}