rtpsession: Send EOS if all internal sources sent bye
authorOlivier Crête <olivier.crete@collabora.com>
Tue, 4 Jul 2017 21:42:25 +0000 (17:42 -0400)
committerOlivier Crête <olivier.crete@collabora.com>
Wed, 5 Jul 2017 01:14:10 +0000 (21:14 -0400)
The ones which are not internal should not matter, and we should
wait for all sources to have sent their BYEs.

And add unit test

https://bugzilla.gnome.org/show_bug.cgi?id=773218

gst/rtpmanager/rtpsession.c
tests/check/Makefile.am
tests/check/elements/rtpbin.c

index 612aa84..3364af2 100644 (file)
@@ -3958,7 +3958,7 @@ rtp_session_are_all_sources_bye (RTPSession * sess)
 
   g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
   while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
-    if (!src->marked_bye)
+    if (src->internal && !src->sent_bye)
       return FALSE;
   }
 
index 750bece..2de9d98 100644 (file)
@@ -533,6 +533,10 @@ elements_rtpbin_buffer_list_LDADD = $(GST_PLUGINS_BASE_LIBS) \
              $(GST_BASE_LIBS) $(GST_LIBS) $(GST_CHECK_LIBS) $(LDADD)
 elements_rtpbin_buffer_list_SOURCES = elements/rtpbin_buffer_list.c
 
+elements_rtpbin_LDADD = $(GST_PLUGINS_BASE_LIBS) \
+             -lgstrtp-$(GST_API_VERSION) \
+             $(GST_BASE_LIBS) $(GST_LIBS) $(GST_CHECK_LIBS) $(LDADD)
+
 elements_rtph261_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(AM_CFLAGS)
 elements_rtph261_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(GST_BASE_LIBS) $(LDADD)
 
index 67bbfdd..8a72e36 100644 (file)
  */
 
 #include <gst/check/gstcheck.h>
+#include <gst/check/gsttestclock.h>
+
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
 
 GST_START_TEST (test_pads)
 {
@@ -689,6 +693,201 @@ GST_START_TEST (test_aux_receiver)
 
 GST_END_TEST;
 
+GST_START_TEST (test_sender_eos)
+{
+  GstElement *rtpsession;
+  GstBuffer *rtp_buffer;
+  GstBuffer *rtcp_buffer;
+  GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
+  GstRTCPBuffer rtcpbuf = GST_RTCP_BUFFER_INIT;
+  GstRTCPPacket rtcppacket;
+  static GstStaticPadTemplate recv_tmpl =
+      GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
+      GST_STATIC_CAPS ("ANY"));
+  GstPad *send_rtp_sink;
+  GstPad *recv_rtcp_sink;
+  GstCaps *caps;
+  GstSegment segment;
+  GstPad *rtp_sink, *rtcp_sink;
+  GstClock *clock;
+  GstTestClock *tclock;
+  GstStructure *s;
+  guint ssrc = 1;
+  guint32 ssrc_in, packet_count, octet_count;
+  gboolean got_bye = FALSE;
+
+  clock = gst_test_clock_new ();
+  gst_system_clock_set_default (clock);
+  tclock = GST_TEST_CLOCK (clock);
+  gst_test_clock_set_time (tclock, 0);
+
+  rtpsession = gst_element_factory_make ("rtpsession", NULL);
+  send_rtp_sink = gst_element_get_request_pad (rtpsession, "send_rtp_sink");
+  recv_rtcp_sink = gst_element_get_request_pad (rtpsession, "recv_rtcp_sink");
+
+
+  rtp_sink = gst_check_setup_sink_pad_by_name (rtpsession, &recv_tmpl,
+      "send_rtp_src");
+  rtcp_sink = gst_check_setup_sink_pad_by_name (rtpsession, &recv_tmpl,
+      "send_rtcp_src");
+
+  gst_pad_set_active (rtp_sink, TRUE);
+  gst_pad_set_active (rtcp_sink, TRUE);
+
+  gst_element_set_state (rtpsession, GST_STATE_PLAYING);
+
+  /* Send initial events */
+
+  gst_segment_init (&segment, GST_FORMAT_TIME);
+  fail_unless (gst_pad_send_event (send_rtp_sink,
+          gst_event_new_stream_start ("id")));
+  fail_unless (gst_pad_send_event (send_rtp_sink,
+          gst_event_new_segment (&segment)));
+
+  fail_unless (gst_pad_send_event (recv_rtcp_sink,
+          gst_event_new_stream_start ("id")));
+  fail_unless (gst_pad_send_event (recv_rtcp_sink,
+          gst_event_new_segment (&segment)));
+
+  /* Get the suggested SSRC from the rtpsession */
+
+  caps = gst_pad_query_caps (send_rtp_sink, NULL);
+  s = gst_caps_get_structure (caps, 0);
+  gst_structure_get (s, "ssrc", G_TYPE_UINT, &ssrc, NULL);
+  gst_caps_unref (caps);
+
+  /* Send a RTP packet */
+
+  rtp_buffer = gst_rtp_buffer_new_allocate (10, 0, 0);
+  gst_rtp_buffer_map (rtp_buffer, GST_MAP_READWRITE, &rtpbuf);
+  gst_rtp_buffer_set_ssrc (&rtpbuf, 1);
+  gst_rtp_buffer_set_seq (&rtpbuf, 0);
+  gst_rtp_buffer_unmap (&rtpbuf);
+
+  fail_unless (gst_pad_chain (send_rtp_sink, rtp_buffer) == GST_FLOW_OK);
+
+  /* Make sure it went through */
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  fail_unless_equals_pointer (buffers->data, rtp_buffer);
+  gst_check_drop_buffers ();
+
+  /* Advance time and send a packet to prevent source sender timeout */
+  gst_test_clock_set_time (tclock, 1 * GST_SECOND);
+
+  /* Just send a send packet to prevent timeout */
+  rtp_buffer = gst_rtp_buffer_new_allocate (10, 0, 0);
+  gst_rtp_buffer_map (rtp_buffer, GST_MAP_READWRITE, &rtpbuf);
+  gst_rtp_buffer_set_ssrc (&rtpbuf, 1);
+  gst_rtp_buffer_set_seq (&rtpbuf, 1);
+  gst_rtp_buffer_set_timestamp (&rtpbuf, 10);
+  gst_rtp_buffer_unmap (&rtpbuf);
+
+  fail_unless (gst_pad_chain (send_rtp_sink, rtp_buffer) == GST_FLOW_OK);
+
+  /* Make sure it went through */
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  fail_unless_equals_pointer (buffers->data, rtp_buffer);
+  gst_check_drop_buffers ();
+
+  /* Advance clock twice and we shoudl have one RTCP packet at least */
+  gst_test_clock_crank (tclock);
+  gst_test_clock_crank (tclock);
+
+  g_mutex_lock (&check_mutex);
+  while (buffers == NULL)
+    g_cond_wait (&check_cond, &check_mutex);
+
+  fail_unless (gst_rtcp_buffer_map (buffers->data, GST_MAP_READ, &rtcpbuf));
+
+  fail_unless (gst_rtcp_buffer_get_first_packet (&rtcpbuf, &rtcppacket));
+
+  fail_unless_equals_int (gst_rtcp_packet_get_type (&rtcppacket),
+      GST_RTCP_TYPE_SR);
+  gst_rtcp_packet_sr_get_sender_info (&rtcppacket, &ssrc_in, NULL, NULL,
+      &packet_count, &octet_count);
+  fail_unless_equals_int (packet_count, 2);
+  fail_unless_equals_int (octet_count, 20);
+
+  fail_unless (gst_rtcp_packet_move_to_next (&rtcppacket));
+  fail_unless_equals_int (gst_rtcp_packet_get_type (&rtcppacket),
+      GST_RTCP_TYPE_SDES);
+
+  gst_rtcp_buffer_unmap (&rtcpbuf);
+  gst_check_drop_buffers ();
+
+  g_mutex_unlock (&check_mutex);
+
+
+  /* Create and send a valid RTCP reply packet */
+  rtcp_buffer = gst_rtcp_buffer_new (1500);
+  gst_rtcp_buffer_map (rtcp_buffer, GST_MAP_READWRITE, &rtcpbuf);
+  gst_rtcp_buffer_add_packet (&rtcpbuf, GST_RTCP_TYPE_RR, &rtcppacket);
+  gst_rtcp_packet_rr_set_ssrc (&rtcppacket, ssrc + 1);
+  gst_rtcp_packet_add_rb (&rtcppacket, ssrc, 0, 0, 0, 0, 0, 0);
+  gst_rtcp_buffer_add_packet (&rtcpbuf, GST_RTCP_TYPE_SDES, &rtcppacket);
+  gst_rtcp_packet_sdes_add_item (&rtcppacket, ssrc + 1);
+  gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_CNAME, 3,
+      (guint8 *) "a@a");
+  gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_NAME, 2,
+      (guint8 *) "aa");
+  gst_rtcp_packet_sdes_add_entry (&rtcppacket, GST_RTCP_SDES_END, 0,
+      (guint8 *) "");
+  gst_rtcp_buffer_unmap (&rtcpbuf);
+  fail_unless (gst_pad_chain (recv_rtcp_sink, rtcp_buffer) == GST_FLOW_OK);
+
+
+  /* Send a EOS to trigger sending a BYE message */
+  fail_unless (gst_pad_send_event (send_rtp_sink, gst_event_new_eos ()));
+
+  /* Crank to process EOS and wait for BYE */
+  for (;;) {
+    gst_test_clock_crank (tclock);
+    g_mutex_lock (&check_mutex);
+    while (buffers == NULL)
+      g_cond_wait (&check_cond, &check_mutex);
+
+    fail_unless (gst_rtcp_buffer_map (g_list_last (buffers)->data, GST_MAP_READ,
+            &rtcpbuf));
+    fail_unless (gst_rtcp_buffer_get_first_packet (&rtcpbuf, &rtcppacket));
+
+    while (gst_rtcp_packet_move_to_next (&rtcppacket)) {
+      if (gst_rtcp_packet_get_type (&rtcppacket) == GST_RTCP_TYPE_BYE) {
+        got_bye = TRUE;
+        break;
+      }
+    }
+    g_mutex_unlock (&check_mutex);
+    gst_rtcp_buffer_unmap (&rtcpbuf);
+
+    if (got_bye)
+      break;
+  }
+
+  gst_check_drop_buffers ();
+
+
+  fail_unless (GST_PAD_IS_EOS (rtp_sink));
+  fail_unless (GST_PAD_IS_EOS (rtcp_sink));
+
+  gst_pad_set_active (rtp_sink, FALSE);
+  gst_pad_set_active (rtcp_sink, FALSE);
+
+  gst_check_teardown_pad_by_name (rtpsession, "send_rtp_src");
+  gst_check_teardown_pad_by_name (rtpsession, "send_rtcp_src");
+  gst_element_release_request_pad (rtpsession, send_rtp_sink);
+  gst_object_unref (send_rtp_sink);
+  gst_element_release_request_pad (rtpsession, recv_rtcp_sink);
+  gst_object_unref (recv_rtcp_sink);
+
+  gst_check_teardown_element (rtpsession);
+
+  gst_system_clock_set_default (NULL);
+  gst_object_unref (clock);
+
+}
+
+GST_END_TEST;
+
 static Suite *
 rtpbin_suite (void)
 {
@@ -705,6 +904,7 @@ rtpbin_suite (void)
   tcase_add_test (tc_chain, test_decoder);
   tcase_add_test (tc_chain, test_aux_sender);
   tcase_add_test (tc_chain, test_aux_receiver);
+  tcase_add_test (tc_chain, test_sender_eos);
 
   return s;
 }