#include <gst/rtp/gstrtpbuffer.h>
+/* UDP/IP is assumed for bandwidth calculation */
+#define UDP_IP_HEADER_OVERHEAD 28
/* This test makes sure that RTP packets sent as buffer lists are sent through
* the rtpbin as they are supposed to, and not corrupted in any way.
return buffer;
}
-static GstBufferList *
-create_buffer_list (void)
+static GstBuffer *
+create_rtp_buffer_fields (gconstpointer header, gint header_size,
+ GstBuffer * payload_buffer, gint payload_offset, gint payload_size,
+ guint16 seqnum, guint32 timestamp)
{
- GstBufferList *list;
- GstBuffer *orig_buffer;
GstBuffer *buffer;
+ GstMemory *memory;
+ GstMapInfo info;
+ gboolean ret;
+ guint32 *tmp;
- orig_buffer = create_original_buffer ();
- fail_if (orig_buffer == NULL);
+ buffer =
+ create_rtp_packet_buffer (header, header_size, payload_buffer,
+ payload_offset, payload_size);
+ fail_if (buffer == NULL);
- list = gst_buffer_list_new ();
- fail_if (list == NULL);
+ memory = gst_buffer_get_memory (buffer, 0);
+ ret = gst_memory_map (memory, &info, GST_MAP_READ);
+ fail_if (ret == FALSE);
- /*** First packet. **/
- buffer =
- create_rtp_packet_buffer (&rtp_header[0], rtp_header_len[0], orig_buffer,
- payload_offset[0], payload_len[0]);
- gst_buffer_list_add (list, buffer);
+ info.data[2] = (seqnum >> 8) & 0xff;
+ info.data[3] = seqnum & 0xff;
- /*** Second packet. ***/
- buffer =
- create_rtp_packet_buffer (&rtp_header[1], rtp_header_len[1], orig_buffer,
- payload_offset[1], payload_len[1]);
- gst_buffer_list_add (list, buffer);
+ tmp = (guint32 *) & (info.data[4]);
+ *tmp = g_htonl (timestamp);
- return list;
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
+
+ return buffer;
+}
+
+static void
+check_seqnum (GstBuffer * buffer, guint16 seqnum)
+{
+ GstMemory *memory;
+ GstMapInfo info;
+ gboolean ret;
+ guint16 current_seqnum;
+
+ fail_if (buffer == NULL);
+
+ memory = gst_buffer_get_memory (buffer, 0);
+ ret = gst_memory_map (memory, &info, GST_MAP_READ);
+ fail_if (ret == FALSE);
+
+ current_seqnum = info.data[2] << 8 | info.data[3];
+ fail_unless (current_seqnum == seqnum);
+
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
}
static void
* chain_list handler has been set.
*/
static gboolean chain_list_func_called;
+static guint chain_list_bytes_received;
+
+/* Create two packets with different payloads. */
+static GstBufferList *
+create_buffer_list (void)
+{
+ GstBufferList *list;
+ GstBuffer *orig_buffer;
+ GstBuffer *buffer;
+
+ orig_buffer = create_original_buffer ();
+ fail_if (orig_buffer == NULL);
+ list = gst_buffer_list_new ();
+ fail_if (list == NULL);
+
+ /*** First packet. **/
+ buffer =
+ create_rtp_packet_buffer (&rtp_header[0], rtp_header_len[0], orig_buffer,
+ payload_offset[0], payload_len[0]);
+ gst_buffer_list_add (list, buffer);
+
+ /*** Second packet. ***/
+ buffer =
+ create_rtp_packet_buffer (&rtp_header[1], rtp_header_len[1], orig_buffer,
+ payload_offset[1], payload_len[1]);
+ gst_buffer_list_add (list, buffer);
+
+ return list;
+}
+
+/* Check that the correct packets have been pushed out of the element. */
static GstFlowReturn
sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
fail_unless (GST_IS_BUFFER_LIST (list));
fail_unless (gst_buffer_list_length (list) == 2);
+ /* received bytes include lower level protocol overhead */
+ chain_list_bytes_received = gst_buffer_list_calculate_size (list) +
+ 2 * UDP_IP_HEADER_OVERHEAD;
+
fail_unless (gst_buffer_list_get (list, 0));
check_packet (list, 0, 0);
return GST_FLOW_OK;
}
+
+/* Non-consecutive seqnums makes probation fail. */
+static GstBufferList *
+create_buffer_list_fail_probation (void)
+{
+ GstBufferList *list;
+ GstBuffer *orig_buffer;
+ GstBuffer *buffer;
+
+ guint16 seqnums[] = { 1, 3, 5 };
+ guint i;
+
+ orig_buffer = create_original_buffer ();
+ fail_if (orig_buffer == NULL);
+
+ list = gst_buffer_list_new ();
+ fail_if (list == NULL);
+
+ for (i = 0; i < sizeof (seqnums) / sizeof (seqnums[0]); i++) {
+ buffer =
+ create_rtp_buffer_fields (&rtp_header[0], rtp_header_len[0],
+ orig_buffer, payload_offset[0], payload_len[0], seqnums[i], 0);
+ gst_buffer_list_add (list, buffer);
+ }
+
+ return list;
+}
+
+/* When probation fails this function shouldn't be called. */
+static GstFlowReturn
+sink_chain_list_probation_failed (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ chain_list_func_called = TRUE;
+ return GST_FLOW_OK;
+}
+
+
+/* After probation succeeds, a small gap in seqnums is allowed. */
+static GstBufferList *
+create_buffer_list_permissible_gap (void)
+{
+ GstBufferList *list;
+ GstBuffer *orig_buffer;
+ GstBuffer *buffer;
+
+ /* probation succeeds, but then there is a permissible out of sequence */
+ guint16 seqnums[] = { 1, 2, 4, 5 };
+ guint i;
+
+ orig_buffer = create_original_buffer ();
+ fail_if (orig_buffer == NULL);
+
+ list = gst_buffer_list_new ();
+ fail_if (list == NULL);
+
+ for (i = 0; i < sizeof (seqnums) / sizeof (seqnums[0]); i++) {
+ buffer =
+ create_rtp_buffer_fields (&rtp_header[0], rtp_header_len[0],
+ orig_buffer, payload_offset[0], payload_len[0], seqnums[i], 0);
+ gst_buffer_list_add (list, buffer);
+ }
+
+ return list;
+}
+
+/* All buffers should have been pushed. */
+static GstFlowReturn
+sink_chain_list_permissible_gap (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstBuffer *buffer;
+
+ chain_list_func_called = TRUE;
+
+ fail_unless (GST_IS_BUFFER_LIST (list));
+ fail_unless (gst_buffer_list_length (list) == 4);
+
+ /* Verify sequence numbers */
+ buffer = gst_buffer_list_get (list, 0);
+ check_seqnum (buffer, 1);
+
+ buffer = gst_buffer_list_get (list, 1);
+ check_seqnum (buffer, 2);
+
+ buffer = gst_buffer_list_get (list, 2);
+ check_seqnum (buffer, 4);
+
+ buffer = gst_buffer_list_get (list, 3);
+ check_seqnum (buffer, 5);
+
+ gst_buffer_list_unref (list);
+
+ return GST_FLOW_OK;
+}
+
+
+/* After probation succeeds, wrapping seqnum is allowed. */
+static GstBufferList *
+create_buffer_list_wrapping_seqnums (void)
+{
+ GstBufferList *list;
+ GstBuffer *orig_buffer;
+ GstBuffer *buffer;
+
+ /* probation succeeds, but then seqnum wraps around */
+ guint16 seqnums[] = { 65533, 65534, 65535, 0 };
+ guint i;
+
+ orig_buffer = create_original_buffer ();
+ fail_if (orig_buffer == NULL);
+
+ list = gst_buffer_list_new ();
+ fail_if (list == NULL);
+
+ for (i = 0; i < sizeof (seqnums) / sizeof (seqnums[0]); i++) {
+ buffer =
+ create_rtp_buffer_fields (&rtp_header[0], rtp_header_len[0],
+ orig_buffer, payload_offset[0], payload_len[0], seqnums[i], 0);
+ gst_buffer_list_add (list, buffer);
+ }
+
+ return list;
+}
+
+/* All buffers should have been pushed. */
+static GstFlowReturn
+sink_chain_list_wrapping_seqnums (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstBuffer *buffer;
+
+ chain_list_func_called = TRUE;
+
+ fail_unless (GST_IS_BUFFER_LIST (list));
+ fail_unless (gst_buffer_list_length (list) == 4);
+
+ /* Verify sequence numbers */
+ buffer = gst_buffer_list_get (list, 0);
+ check_seqnum (buffer, 65533);
+
+ buffer = gst_buffer_list_get (list, 1);
+ check_seqnum (buffer, 65534);
+
+ buffer = gst_buffer_list_get (list, 2);
+ check_seqnum (buffer, 65535);
+
+ buffer = gst_buffer_list_get (list, 3);
+ check_seqnum (buffer, 0);
+
+ gst_buffer_list_unref (list);
+
+ return GST_FLOW_OK;
+}
+
+
+/* Get the stats of the **first** source of the given type (get_sender) */
static void
-set_chain_function (GstPad * pad)
+get_source_stats (GstElement * rtpsession,
+ gboolean get_sender, GstStructure ** source_stats)
{
- gst_pad_set_chain_list_function (pad, GST_DEBUG_FUNCPTR (sink_chain_list));
+ GstStructure *stats;
+ GValueArray *stats_arr;
+ guint i;
+
+ g_object_get (rtpsession, "stats", &stats, NULL);
+ stats_arr =
+ g_value_get_boxed (gst_structure_get_value (stats, "source-stats"));
+ g_assert (stats_arr != NULL);
+ fail_unless (stats_arr->n_values >= 1);
+
+ *source_stats = NULL;
+ for (i = 0; i < stats_arr->n_values; i++) {
+ GstStructure *tmp_source_stats;
+ gboolean is_sender;
+
+ tmp_source_stats = g_value_dup_boxed (&stats_arr->values[i]);
+ gst_structure_get (tmp_source_stats, "is-sender", G_TYPE_BOOLEAN,
+ &is_sender, NULL);
+
+ /* Return the stats of the **first** source of the given type. */
+ if (is_sender == get_sender) {
+ *source_stats = tmp_source_stats;
+ break;
+ }
+ gst_structure_free (tmp_source_stats);
+ }
+
+ gst_structure_free (stats);
}
+/* Get the source stats given a session and a source type (get_sender) */
+static void
+get_session_source_stats (GstElement * rtpbin, guint session,
+ gboolean get_sender, GstStructure ** source_stats)
+{
+ GstElement *rtpsession;
+
+ g_signal_emit_by_name (rtpbin, "get-session", session, &rtpsession);
+ fail_if (rtpsession == NULL);
+
+ get_source_stats (rtpsession, get_sender, source_stats);
+
+ gst_object_unref (rtpsession);
+}
GST_START_TEST (test_bufferlist)
{
GstPad *sinkpad;
GstCaps *caps;
GstBufferList *list;
+ GstStructure *stats;
+ guint64 packets_sent;
+ guint64 packets_received;
list = create_buffer_list ();
fail_unless (list != NULL);
"send_rtp_src_0");
fail_if (sinkpad == NULL);
- set_chain_function (sinkpad);
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (sink_chain_list));
gst_pad_set_active (srcpad, TRUE);
gst_pad_set_active (sinkpad, TRUE);
fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK);
fail_if (chain_list_func_called == FALSE);
+ /* make sure that stats about the number of sent packets are OK too */
+ get_session_source_stats (rtpbin, 0, TRUE, &stats);
+ fail_if (stats == NULL);
+
+ gst_structure_get (stats,
+ "packets-sent", G_TYPE_UINT64, &packets_sent,
+ "packets-received", G_TYPE_UINT64, &packets_received, NULL);
+ fail_unless (packets_sent == 2);
+ fail_unless (packets_received == 2);
+ gst_structure_free (stats);
+
gst_pad_set_active (sinkpad, FALSE);
gst_pad_set_active (srcpad, FALSE);
GST_END_TEST;
+
+GST_START_TEST (test_bufferlist_recv)
+{
+ GstElement *rtpbin;
+ GstPad *srcpad;
+ GstPad *sinkpad;
+ GstCaps *caps;
+ GstBufferList *list;
+ GstStructure *stats;
+ guint64 bytes_received;
+ guint64 packets_received;
+
+ list = create_buffer_list ();
+ fail_unless (list != NULL);
+
+ rtpbin = gst_check_setup_element ("rtpsession");
+
+ srcpad =
+ gst_check_setup_src_pad_by_name (rtpbin, &srctemplate, "recv_rtp_sink");
+ fail_if (srcpad == NULL);
+
+ sinkpad =
+ gst_check_setup_sink_pad_by_name (rtpbin, &sinktemplate, "recv_rtp_src");
+ fail_if (sinkpad == NULL);
+
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (sink_chain_list));
+
+ gst_pad_set_active (srcpad, TRUE);
+ gst_pad_set_active (sinkpad, TRUE);
+
+ caps = gst_caps_from_string (TEST_CAPS);
+ gst_check_setup_events (srcpad, rtpbin, caps, GST_FORMAT_TIME);
+ gst_caps_unref (caps);
+
+ gst_element_set_state (rtpbin, GST_STATE_PLAYING);
+
+ chain_list_func_called = FALSE;
+ fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK);
+ fail_if (chain_list_func_called == FALSE);
+
+ /* make sure that stats about the number of received packets are OK too */
+ /* The source becomes a sender after probation succeeds, pass TRUE here. */
+ get_source_stats (rtpbin, TRUE, &stats);
+ fail_if (stats == NULL);
+
+ gst_structure_get (stats,
+ "bytes-received", G_TYPE_UINT64, &bytes_received,
+ "packets-received", G_TYPE_UINT64, &packets_received, NULL);
+ fail_unless (packets_received == 2);
+ fail_unless (bytes_received == chain_list_bytes_received);
+ gst_structure_free (stats);
+
+
+ gst_pad_set_active (sinkpad, FALSE);
+ gst_pad_set_active (srcpad, FALSE);
+
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_src");
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_sink");
+ gst_check_teardown_element (rtpbin);
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_bufferlist_recv_probation_failed)
+{
+ GstElement *rtpbin;
+ GstPad *srcpad;
+ GstPad *sinkpad;
+ GstCaps *caps;
+ GstBufferList *list;
+
+ list = create_buffer_list_fail_probation ();
+ fail_unless (list != NULL);
+
+ rtpbin = gst_check_setup_element ("rtpsession");
+
+ srcpad =
+ gst_check_setup_src_pad_by_name (rtpbin, &srctemplate, "recv_rtp_sink");
+ fail_if (srcpad == NULL);
+
+ sinkpad =
+ gst_check_setup_sink_pad_by_name (rtpbin, &sinktemplate, "recv_rtp_src");
+ fail_if (sinkpad == NULL);
+
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (sink_chain_list_probation_failed));
+
+ gst_pad_set_active (srcpad, TRUE);
+ gst_pad_set_active (sinkpad, TRUE);
+
+ caps = gst_caps_from_string (TEST_CAPS);
+ gst_check_setup_events (srcpad, rtpbin, caps, GST_FORMAT_TIME);
+ gst_caps_unref (caps);
+
+ gst_element_set_state (rtpbin, GST_STATE_PLAYING);
+
+ chain_list_func_called = FALSE;
+ fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK);
+ /* when probation fails the list should not be pushed at all, and the
+ * chain_list functions should not be called, fail if it has been. */
+ fail_if (chain_list_func_called == TRUE);
+
+ gst_pad_set_active (sinkpad, FALSE);
+ gst_pad_set_active (srcpad, FALSE);
+
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_src");
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_sink");
+ gst_check_teardown_element (rtpbin);
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_bufferlist_recv_permissible_gap)
+{
+ GstElement *rtpbin;
+ GstPad *srcpad;
+ GstPad *sinkpad;
+ GstCaps *caps;
+ GstBufferList *list;
+
+ list = create_buffer_list_permissible_gap ();
+ fail_unless (list != NULL);
+
+ rtpbin = gst_check_setup_element ("rtpsession");
+
+ srcpad =
+ gst_check_setup_src_pad_by_name (rtpbin, &srctemplate, "recv_rtp_sink");
+ fail_if (srcpad == NULL);
+
+ sinkpad =
+ gst_check_setup_sink_pad_by_name (rtpbin, &sinktemplate, "recv_rtp_src");
+ fail_if (sinkpad == NULL);
+
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (sink_chain_list_permissible_gap));
+
+ gst_pad_set_active (srcpad, TRUE);
+ gst_pad_set_active (sinkpad, TRUE);
+
+ caps = gst_caps_from_string (TEST_CAPS);
+ gst_check_setup_events (srcpad, rtpbin, caps, GST_FORMAT_TIME);
+ gst_caps_unref (caps);
+
+ gst_element_set_state (rtpbin, GST_STATE_PLAYING);
+
+ chain_list_func_called = FALSE;
+ fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK);
+ fail_if (chain_list_func_called == FALSE);
+
+ gst_pad_set_active (sinkpad, FALSE);
+ gst_pad_set_active (srcpad, FALSE);
+
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_src");
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_sink");
+ gst_check_teardown_element (rtpbin);
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_bufferlist_recv_wrapping_seqnums)
+{
+ GstElement *rtpbin;
+ GstPad *srcpad;
+ GstPad *sinkpad;
+ GstCaps *caps;
+ GstBufferList *list;
+
+ list = create_buffer_list_wrapping_seqnums ();
+ fail_unless (list != NULL);
+
+ rtpbin = gst_check_setup_element ("rtpsession");
+
+ srcpad =
+ gst_check_setup_src_pad_by_name (rtpbin, &srctemplate, "recv_rtp_sink");
+ fail_if (srcpad == NULL);
+
+ sinkpad =
+ gst_check_setup_sink_pad_by_name (rtpbin, &sinktemplate, "recv_rtp_src");
+ fail_if (sinkpad == NULL);
+
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (sink_chain_list_wrapping_seqnums));
+
+ gst_pad_set_active (srcpad, TRUE);
+ gst_pad_set_active (sinkpad, TRUE);
+
+ caps = gst_caps_from_string (TEST_CAPS);
+ gst_check_setup_events (srcpad, rtpbin, caps, GST_FORMAT_TIME);
+ gst_caps_unref (caps);
+
+ gst_element_set_state (rtpbin, GST_STATE_PLAYING);
+
+ chain_list_func_called = FALSE;
+ fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK);
+ fail_if (chain_list_func_called == FALSE);
+
+ gst_pad_set_active (sinkpad, FALSE);
+ gst_pad_set_active (srcpad, FALSE);
+
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_src");
+ gst_check_teardown_pad_by_name (rtpbin, "recv_rtp_sink");
+ gst_check_teardown_element (rtpbin);
+}
+
+GST_END_TEST;
+
+
static Suite *
bufferlist_suite (void)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_bufferlist);
+ tcase_add_test (tc_chain, test_bufferlist_recv);
+ tcase_add_test (tc_chain, test_bufferlist_recv_probation_failed);
+ tcase_add_test (tc_chain, test_bufferlist_recv_permissible_gap);
+ tcase_add_test (tc_chain, test_bufferlist_recv_wrapping_seqnums);
return s;
}