From 880f49405051bca70ab3807444baa47c208f1021 Mon Sep 17 00:00:00 2001 From: Mikhail Fludkov Date: Thu, 7 Jul 2016 11:13:18 +0200 Subject: [PATCH] tests/rtprtx: refactor the tests to use gstharness The functionality of all the tests was kept exactly the same. Some tests were renamed: test_push_forward_seq -> test_rtxsend_rtxreceive test_drop_one_sender -> test_rtxsend_rtxreceive_with_packet_loss test_drop_multiple_sender -> test_multi_rtxsend_rtxreceive_with_packet_loss test_rtxreceive_data_reconstruction was testing that retransmitted buffer produced by rtxsend was correctly transformed to the original buffer by rtxreceive. Now we are checking for this in all the tests where both rtxsend & rtxreceive are involved. That's why the test was removed. --- tests/check/elements/rtprtx.c | 1898 ++++++++++------------------------------- 1 file changed, 443 insertions(+), 1455 deletions(-) diff --git a/tests/check/elements/rtprtx.c b/tests/check/elements/rtprtx.c index d3c5c6a..aaa1ac4 100644 --- a/tests/check/elements/rtprtx.c +++ b/tests/check/elements/rtprtx.c @@ -18,1422 +18,537 @@ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ - #include -#include - +#include #include -/* For ease of programming we use globals to keep refs for our floating - * src and sink pads we create; otherwise we always have to do get_pad, - * get_peer, and then remove references in every test function */ -static GstPad *srcpad, *sinkpad; -/* we also have a list of src buffers */ -static GList *inbuffers = NULL; - -#define RTP_CAPS_STRING \ - "application/x-rtp, " \ - "media = (string)audio, " \ - "payload = (int) 0, " \ - "clock-rate = (int) 8000, " \ - "ssrc = (uint) 42, " \ - "encoding-name = (string)PCMU" - -#define RTP_FRAME_SIZE 20 - -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/x-rtp") - ); -static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", - GST_PAD_SRC, - GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/x-rtp") - ); - -static void -setup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive, - gint num_buffers) -{ - GstBuffer *buffer; - GstPad *sendsrcpad; - GstPad *receivesinkpad; - gboolean ret = FALSE; - - /* a 20 sample audio block (2,5 ms) generated with - * gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 ! - * "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay ! - * fakesink dump=1 - */ - guint8 in[] = { /* first 4 bytes are rtp-header, next 4 bytes are timestamp */ - 0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff - }; - GstClockTime ts = G_GUINT64_CONSTANT (0); - GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000); - gint i; - - srcpad = gst_check_setup_src_pad (rtprtxsend, &srctemplate); - sendsrcpad = gst_element_get_static_pad (rtprtxsend, "src"); - ret = gst_pad_set_active (srcpad, TRUE); - fail_if (ret == FALSE); - - sinkpad = gst_check_setup_sink_pad (rtprtxreceive, &sinktemplate); - receivesinkpad = gst_element_get_static_pad (rtprtxreceive, "sink"); - ret = gst_pad_set_active (sinkpad, TRUE); - fail_if (ret == FALSE); - - fail_if (gst_pad_link (sendsrcpad, receivesinkpad) != GST_PAD_LINK_OK); - - ret = gst_pad_set_active (sendsrcpad, TRUE); - fail_if (ret == FALSE); - ret = gst_pad_set_active (receivesinkpad, TRUE); - fail_if (ret == FALSE); - - gst_object_unref (sendsrcpad); - gst_object_unref (receivesinkpad); - - for (i = 0; i < num_buffers; i++) { - buffer = gst_buffer_new_and_alloc (sizeof (in)); - gst_buffer_fill (buffer, 0, in, sizeof (in)); - GST_BUFFER_DTS (buffer) = ts; - GST_BUFFER_PTS (buffer) = ts; - GST_BUFFER_DURATION (buffer) = tso; - GST_DEBUG ("created buffer: %p", buffer); - - /*if (!i) - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); */ - - inbuffers = g_list_append (inbuffers, buffer); - - /* hackish way to update the rtp header */ - in[1] = 0x00; - in[3]++; /* seqnumber */ - in[7] += RTP_FRAME_SIZE; /* inc. timestamp with framesize */ - ts += tso; - } -} - -static GstStateChangeReturn -start_rtprtx (GstElement * element) -{ - GstStateChangeReturn ret; - - ret = gst_element_set_state (element, GST_STATE_PLAYING); - ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE); +#define verify_buf(buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \ + G_STMT_START { \ + GstRTPBuffer _rtp = GST_RTP_BUFFER_INIT; \ + fail_unless (gst_rtp_buffer_map (buf, GST_MAP_READ, &_rtp)); \ + fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&_rtp), expected_ssrc); \ + fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&_rtp), expted_pt); \ + if (!(is_rtx)) { \ + fail_unless_equals_int (gst_rtp_buffer_get_seq (&_rtp), expected_seqnum); \ + } else { \ + fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload \ + (&_rtp)), expected_seqnum); \ + } \ + gst_rtp_buffer_unmap (&_rtp); \ + } G_STMT_END + +#define pull_and_verify(h, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \ + G_STMT_START { \ + GstBuffer *_buf = gst_harness_pull (h); \ + verify_buf (_buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum); \ + gst_buffer_unref (_buf); \ + } G_STMT_END + +#define push_pull_and_verify(h, buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \ + G_STMT_START { \ + gst_harness_push (h, buf); \ + pull_and_verify (h, is_rtx, expected_ssrc, expted_pt, expected_seqnum); \ + } G_STMT_END - ret = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE); - ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE); - - return ret; -} - -static void -cleanup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive) +static GstEvent * +create_rtx_event (guint32 ssrc, guint8 payload_type, guint16 seqnum) { - GST_DEBUG ("cleanup_rtprtx"); - - g_list_free (inbuffers); - inbuffers = NULL; - - gst_check_drop_buffers (); - gst_pad_set_active (srcpad, FALSE); - gst_check_teardown_src_pad (rtprtxsend); - gst_check_teardown_element (rtprtxsend); - - gst_pad_set_active (sinkpad, FALSE); - gst_check_teardown_sink_pad (rtprtxreceive); - gst_check_teardown_element (rtprtxreceive); + return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstRTPRetransmissionRequest", + "seqnum", G_TYPE_UINT, seqnum, + "ssrc", G_TYPE_UINT, ssrc, + "payload-type", G_TYPE_UINT, payload_type, NULL)); } static void -check_rtprtx_results (GstElement * rtprtxsend, GstElement * rtprtxreceive, - gint num_buffers) +compare_rtp_packets (GstBuffer * a, GstBuffer * b) { - guint nbrtxrequests = 0; - guint nbrtxpackets = 0; - - g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests, - NULL); - fail_unless_equals_int (nbrtxrequests, 3); + GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT; + GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT; - g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL); - fail_unless_equals_int (nbrtxpackets, 3); + gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a); + gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests, - NULL); - fail_unless_equals_int (nbrtxrequests, 3); + fail_unless_equals_int (gst_rtp_buffer_get_header_len (&rtp_a), + gst_rtp_buffer_get_header_len (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_version (&rtp_a), + gst_rtp_buffer_get_version (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp_a), + gst_rtp_buffer_get_ssrc (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp_a), + gst_rtp_buffer_get_seq (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_csrc_count (&rtp_a), + gst_rtp_buffer_get_csrc_count (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_marker (&rtp_a), + gst_rtp_buffer_get_marker (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp_a), + gst_rtp_buffer_get_payload_type (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&rtp_a), + gst_rtp_buffer_get_timestamp (&rtp_b)); + fail_unless_equals_int (gst_rtp_buffer_get_extension (&rtp_a), + gst_rtp_buffer_get_extension (&rtp_b)); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets, - NULL); - fail_unless_equals_int (nbrtxpackets, 3); + fail_unless_equals_int (gst_rtp_buffer_get_payload_len (&rtp_a), + gst_rtp_buffer_get_payload_len (&rtp_b)); + fail_unless_equals_int (memcmp (gst_rtp_buffer_get_payload (&rtp_a), + gst_rtp_buffer_get_payload (&rtp_b), + gst_rtp_buffer_get_payload_len (&rtp_a)), 0); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", - &nbrtxpackets, NULL); - fail_unless_equals_int (nbrtxpackets, 3); + gst_rtp_buffer_unmap (&rtp_a); + gst_rtp_buffer_unmap (&rtp_b); } - -GST_START_TEST (test_push_forward_seq) +static GstBuffer * +create_rtp_buffer (guint32 ssrc, guint8 payload_type, guint16 seqnum) { - GstElement *rtprtxsend; - GstElement *rtprtxreceive; - const guint num_buffers = 4; - GList *node; - gint i = 0; - GstCaps *caps = NULL; - GstStructure *pt_map; + GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT; + guint payload_size = 29; + guint64 timestamp = gst_util_uint64_scale_int (seqnum, 90000, 30); + GstBuffer *buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0); - rtprtxsend = gst_check_setup_element ("rtprtxsend"); - rtprtxreceive = gst_check_setup_element ("rtprtxreceive"); - setup_rtprtx (rtprtxsend, rtprtxreceive, num_buffers); - GST_DEBUG ("setup_rtprtx"); - - fail_unless (start_rtprtx (rtprtxsend) - == GST_STATE_CHANGE_SUCCESS, "could not set to playing"); - - fail_unless (start_rtprtx (rtprtxreceive) - == GST_STATE_CHANGE_SUCCESS, "could not set to playing"); - - caps = gst_caps_from_string (RTP_CAPS_STRING); - gst_check_setup_events (srcpad, rtprtxsend, caps, GST_FORMAT_TIME); - gst_caps_unref (caps); - - pt_map = gst_structure_new ("application/x-rtp-pt-map", - "0", G_TYPE_UINT, 97, NULL); - g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL); - g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL); - gst_structure_free (pt_map); - - /* push buffers: 0,1,2, */ - for (node = inbuffers; node; node = g_list_next (node)) { - GstEvent *event = NULL; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - GstBuffer *buffer = (GstBuffer *) node->data; - GList *last_out_buffer; - guint64 end_time; - gboolean res; - - gst_buffer_ref (buffer); - fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK); - - if (i < 3) { - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstRTPRetransmissionRequest", - "seqnum", G_TYPE_UINT, (guint) gst_rtp_buffer_get_seq (&rtp), - "ssrc", G_TYPE_UINT, (guint) gst_rtp_buffer_get_ssrc (&rtp), - "payload-type", G_TYPE_UINT, - (guint) gst_rtp_buffer_get_payload_type (&rtp), NULL)); - gst_rtp_buffer_unmap (&rtp); - - /* synchronize with the chain() function of the "sinkpad" - * to make sure that rtxsend has pushed the rtx buffer out - * before continuing */ - last_out_buffer = g_list_last (buffers); - g_mutex_lock (&check_mutex); - fail_unless (gst_pad_push_event (sinkpad, event)); - end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND; - do - res = g_cond_wait_until (&check_cond, &check_mutex, end_time); - while (res == TRUE && last_out_buffer == g_list_last (buffers)); - g_mutex_unlock (&check_mutex); - } - gst_buffer_unref (buffer); - ++i; - } - - /* check the buffer list */ - check_rtprtx_results (rtprtxsend, rtprtxreceive, num_buffers); - - /* cleanup */ - cleanup_rtprtx (rtprtxsend, rtprtxreceive); + gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtpbuf); + gst_rtp_buffer_set_ssrc (&rtpbuf, ssrc); + gst_rtp_buffer_set_payload_type (&rtpbuf, payload_type); + gst_rtp_buffer_set_seq (&rtpbuf, seqnum); + gst_rtp_buffer_set_timestamp (&rtpbuf, (guint32) timestamp); + memset (gst_rtp_buffer_get_payload (&rtpbuf), 0x29, payload_size); + gst_rtp_buffer_unmap (&rtpbuf); + return buf; } -GST_END_TEST; - -static void -message_received (GstBus * bus, GstMessage * message, gboolean * eos) +static GstBuffer * +create_rtp_buffer_with_timestamp (guint32 ssrc, guint8 payload_type, + guint16 seqnum, guint32 timestamp) { - GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, - GST_MESSAGE_SRC (message), message); - - switch (message->type) { - case GST_MESSAGE_EOS: - *eos = TRUE; - break; - case GST_MESSAGE_WARNING:{ - GError *gerror; - gchar *debug; - - gst_message_parse_warning (message, &gerror, &debug); - gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); - g_error_free (gerror); - g_free (debug); - break; - } - case GST_MESSAGE_ERROR:{ - GError *gerror; - gchar *debug; - - gst_message_parse_error (message, &gerror, &debug); - gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); - fail ("Error: %s / %s", gerror->message, debug); - g_error_free (gerror); - g_free (debug); - break; - } - default: - break; - } + GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT; + GstBuffer *buf = create_rtp_buffer (ssrc, payload_type, seqnum); + gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtpbuf); + gst_rtp_buffer_set_timestamp (&rtpbuf, timestamp); + gst_rtp_buffer_unmap (&rtpbuf); + return buf; } -typedef struct -{ - guint count; - guint nb_packets; - guint drop_every_n_packets; -} RTXSendData; - -typedef struct -{ - guint nb_packets; - guint seqnum_offset; - guint seqnum_prev; -} RTXReceiveData; - -static GstPadProbeReturn -do_buffer_list_as_buffers_probe (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data, GstPadProbeCallback callback) +GST_START_TEST (test_rtxsend_rtxreceive) { - /* Iterate the buffer list, removing any items that we're - * told to drop and creating a new bufferlist. If all buffers - * are dropped, return DROP. - */ - guint i, len; - GstBufferList *list; - GstBufferList *outlist; - GstPadProbeInfo buf_info = *info; - - GST_INFO_OBJECT (pad, "probing each buffer in list individually"); - - list = gst_pad_probe_info_get_buffer_list (info); + const guint packets_num = 5; + guint master_ssrc = 1234567; + guint master_pt = 96; + guint rtx_pt = 99; + GstStructure *pt_map; + GstBuffer *inbufs[5]; + GstHarness *hrecv = gst_harness_new ("rtprtxreceive"); + GstHarness *hsend = gst_harness_new ("rtprtxsend"); - g_return_val_if_fail (list != NULL, GST_PAD_PROBE_REMOVE); + pt_map = gst_structure_new ("application/x-rtp-pt-map", + "96", G_TYPE_UINT, rtx_pt, NULL); + g_object_set (hrecv->element, "payload-type-map", pt_map, NULL); + g_object_set (hsend->element, "payload-type-map", pt_map, NULL); - len = gst_buffer_list_length (list); - outlist = gst_buffer_list_new_sized (len); + gst_harness_set_src_caps_str (hsend, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + gst_harness_set_src_caps_str (hrecv, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); - buf_info.type = GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH; - for (i = 0; i < len; i++) { - GstBuffer *buffer = gst_buffer_list_get (list, i); - GstPadProbeReturn ret; - buf_info.data = buffer; - ret = callback (pad, &buf_info, user_data); - /* If the buffer wasn't dropped, add it to the output list */ - if (ret != GST_PAD_PROBE_DROP) - gst_buffer_list_insert (outlist, -1, gst_buffer_ref (buffer)); + /* Push 'packets_num' packets through rtxsend to rtxreceive */ + for (gint i = 0; i < packets_num; ++i) { + inbufs[i] = create_rtp_buffer (master_ssrc, master_pt, 100 + i); + gst_harness_push (hsend, gst_buffer_ref (inbufs[i])); + gst_harness_push (hrecv, gst_harness_pull (hsend)); + pull_and_verify (hrecv, FALSE, master_ssrc, master_pt, 100 + i); } - len = gst_buffer_list_length (outlist); - if (len == 0) { - /* Everything was discarded, drop our outlist */ - gst_buffer_list_unref (outlist); - return GST_PAD_PROBE_DROP; + /* Getting rid of reconfigure event. Preparation before the next step */ + gst_event_unref (gst_harness_pull_upstream_event (hrecv)); + fail_unless_equals_int (gst_harness_upstream_events_in_queue (hrecv), 0); + + /* Push 'packets_num' RTX events through rtxreceive to rtxsend. + Push RTX packets from rtxsend to rtxreceive and + check that the packet produced out of RTX packet is the same + as an original packet */ + for (gint i = 0; i < packets_num; ++i) { + GstBuffer *outbuf; + gst_harness_push_upstream_event (hrecv, + create_rtx_event (master_ssrc, master_pt, 100 + i)); + gst_harness_push_upstream_event (hsend, + gst_harness_pull_upstream_event (hrecv)); + gst_harness_push (hrecv, gst_harness_pull (hsend)); + + outbuf = gst_harness_pull (hrecv); + compare_rtp_packets (inbufs[i], outbuf); + gst_buffer_unref (inbufs[i]); + gst_buffer_unref (outbuf); } - /* Replace the original buffer list with the modified one */ - gst_buffer_list_unref (list); - info->data = outlist; - return GST_PAD_PROBE_OK; -} - -static GstPadProbeReturn -rtprtxsend_srcpad_probe (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) -{ - GstPadProbeReturn ret = GST_PAD_PROBE_OK; - - GST_LOG_OBJECT (pad, "here"); - if (info->type == (GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH)) - return do_buffer_list_as_buffers_probe (pad, info, user_data, - rtprtxsend_srcpad_probe); - - if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { - GstBuffer *buffer = GST_BUFFER (info->data); - RTXSendData *rtxdata = (RTXSendData *) user_data; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - guint payload_type = 0; - - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - payload_type = gst_rtp_buffer_get_payload_type (&rtp); - - /* main stream packets */ - if (payload_type == 96) { - /* count packets of the main stream */ - ++rtxdata->nb_packets; - /* drop some packets */ - if (rtxdata->count < rtxdata->drop_every_n_packets) { - ++rtxdata->count; - } else { - /* drop a packet every 'rtxdata->count' packets */ - rtxdata->count = 1; - ret = GST_PAD_PROBE_DROP; - } - } else { - /* retransmission packets */ - } - - gst_rtp_buffer_unmap (&rtp); + /* Check RTX stats */ + { + guint rtx_requests; + guint rtx_packets; + guint rtx_assoc_packets; + g_object_get (G_OBJECT (hsend->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, NULL); + fail_unless_equals_int (rtx_packets, packets_num); + fail_unless_equals_int (rtx_requests, packets_num); + + g_object_get (G_OBJECT (hrecv->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, + "num-rtx-assoc-packets", &rtx_assoc_packets, NULL); + fail_unless_equals_int (rtx_packets, packets_num); + fail_unless_equals_int (rtx_requests, packets_num); + fail_unless_equals_int (rtx_assoc_packets, packets_num); } - return ret; + gst_structure_free (pt_map); + gst_harness_teardown (hrecv); + gst_harness_teardown (hsend); } -static GstPadProbeReturn -rtprtxreceive_srcpad_probe (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) -{ - if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { - GstBuffer *buffer = GST_BUFFER (info->data); - RTXReceiveData *rtxdata = (RTXReceiveData *) user_data; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - guint seqnum = 0; - guint i = 0; - - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - seqnum = gst_rtp_buffer_get_seq (&rtp); - - /* check if there is a dropped packet */ - if (seqnum > rtxdata->seqnum_prev + rtxdata->seqnum_offset) { - GstPad *peerpad = gst_pad_get_peer (pad); - - /* ask retransmission of missing packet */ - for (i = rtxdata->seqnum_prev + rtxdata->seqnum_offset; i < seqnum; - i += rtxdata->seqnum_offset) { - GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstRTPRetransmissionRequest", - "seqnum", G_TYPE_UINT, i, - "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp), - "payload-type", G_TYPE_UINT, - gst_rtp_buffer_get_payload_type (&rtp), - NULL)); - gst_pad_push_event (peerpad, event); - } - gst_object_unref (peerpad); - - rtxdata->seqnum_prev = seqnum; - } else if (seqnum == rtxdata->seqnum_prev + rtxdata->seqnum_offset) { - /* also update previous seqnum in this case */ - rtxdata->seqnum_prev = seqnum; - } - - gst_rtp_buffer_unmap (&rtp); - - ++rtxdata->nb_packets; - } - - return GST_PAD_PROBE_OK; -} +GST_END_TEST; -static void -start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader, - GstElement * rtprtxsend, GstElement * rtprtxreceive, - RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata, - guint drop_every_n_packets, gboolean * eos) +GST_START_TEST (test_rtxsend_rtxreceive_with_packet_loss) { - GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; - guint nbrtxrequests = 0; - guint nbrtxpackets = 0; - guint nb_expected_requests = 0; + guint packets_num = 20; + guint master_ssrc = 1234567; + guint master_pt = 96; + guint rtx_pt = 99; + guint seqnum = 100; + guint expected_rtx_packets = 0; GstStructure *pt_map; - - GST_INFO ("starting test"); + GstHarness *hrecv = gst_harness_new ("rtprtxreceive"); + GstHarness *hsend = gst_harness_new ("rtprtxsend"); pt_map = gst_structure_new ("application/x-rtp-pt-map", - "96", G_TYPE_UINT, 99, NULL); - g_object_set (rtppayloader, "pt", 96, NULL); - g_object_set (rtppayloader, "seqnum-offset", 1, NULL); - g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL); - g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL); - gst_structure_free (pt_map); - - send_rtxdata->count = 1; - send_rtxdata->nb_packets = 0; - send_rtxdata->drop_every_n_packets = drop_every_n_packets; - - receive_rtxdata->nb_packets = 0; - receive_rtxdata->seqnum_offset = 0; - receive_rtxdata->seqnum_prev = 0; - - *eos = FALSE; - - /* retrieve offset before going to paused */ - g_object_get (G_OBJECT (rtppayloader), "seqnum-offset", - &receive_rtxdata->seqnum_offset, NULL); - - /* prepare playing */ - state_res = gst_element_set_state (bin, GST_STATE_PAUSED); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); - - /* wait for completion */ - state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); - - /* retrieve seqnum_prev here to make sure it has been reseted */ - g_object_get (G_OBJECT (rtppayloader), "seqnum", - &receive_rtxdata->seqnum_prev, NULL); - - /* run pipeline */ - state_res = gst_element_set_state (bin, GST_STATE_PLAYING); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); - - GST_INFO ("running main loop"); - while (!*eos) - g_main_context_iteration (NULL, TRUE); - - /* check results */ - - if (send_rtxdata->nb_packets % drop_every_n_packets == 0) { - /* special case because the last buffer will be dropped - * so the receiver cannot know if it has been dropped (no next packet) - */ - nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets - 1; - fail_unless_equals_int (send_rtxdata->nb_packets, - receive_rtxdata->nb_packets + 1); - } else { - nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets; - fail_unless_equals_int (send_rtxdata->nb_packets, - receive_rtxdata->nb_packets); - } - - g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests, - NULL); - fail_unless_equals_int (nbrtxrequests, nb_expected_requests); - - g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL); - fail_unless_equals_int (nbrtxpackets, nb_expected_requests); + "96", G_TYPE_UINT, rtx_pt, NULL); + g_object_set (hrecv->element, "payload-type-map", pt_map, NULL); + g_object_set (hsend->element, "payload-type-map", pt_map, NULL); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests, - NULL); - fail_unless_equals_int (nbrtxrequests, nb_expected_requests); - - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets, - NULL); - fail_unless_equals_int (nbrtxpackets, nb_expected_requests); + gst_harness_set_src_caps_str (hsend, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + gst_harness_set_src_caps_str (hrecv, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", - &nbrtxpackets, NULL); - fail_unless_equals_int (nbrtxpackets, nb_expected_requests); + /* Getting rid of reconfigure event. Making sure there is no upstream + events in the queue. Preparation step before the test. */ + gst_event_unref (gst_harness_pull_upstream_event (hrecv)); + fail_unless_equals_int (gst_harness_upstream_events_in_queue (hrecv), 0); + + /* Push 'packets_num' packets through rtxsend to rtxreceive loosing every + 'drop_every_n_packets' packet. When we loose the packet we send RTX event + through rtxreceive to rtxsend, and verify the packet was retransmitted */ + for (gint drop_nth_packet = 2; drop_nth_packet < 10; ++drop_nth_packet) { + for (gint i = 0; i < packets_num; ++i, ++seqnum) { + GstBuffer *outbuf; + GstBuffer *inbuf = create_rtp_buffer (master_ssrc, master_pt, seqnum); + gboolean drop_this_packet = ((i + 1) % drop_nth_packet) == 0; + + gst_harness_push (hsend, gst_buffer_ref (inbuf)); + if (drop_this_packet) { + /* Dropping original packet */ + gst_buffer_unref (gst_harness_pull (hsend)); + /* Requesting retransmission */ + gst_harness_push_upstream_event (hrecv, + create_rtx_event (master_ssrc, master_pt, seqnum)); + gst_harness_push_upstream_event (hsend, + gst_harness_pull_upstream_event (hrecv)); + /* Pushing RTX packet to rtxreceive */ + gst_harness_push (hrecv, gst_harness_pull (hsend)); + ++expected_rtx_packets; + } else { + gst_harness_push (hrecv, gst_harness_pull (hsend)); + } - state_res = gst_element_set_state (bin, GST_STATE_NULL); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); -} + /* We making sure every buffer we pull is the same as original input + buffer */ + outbuf = gst_harness_pull (hrecv); + compare_rtp_packets (inbuf, outbuf); + gst_buffer_unref (inbuf); + gst_buffer_unref (outbuf); + + /* + We should not have any packets in the harness queue by this point. It + means rtxsend didn't send more packets than RTX events and rtxreceive + didn't produce more than one packet per RTX packet. + */ + fail_unless_equals_int (gst_harness_buffers_in_queue (hsend), 0); + fail_unless_equals_int (gst_harness_buffers_in_queue (hrecv), 0); + } + } -/* This test build the pipeline videotestsrc ! rtpvrawpay ! rtprtxsend ! rtprtxreceive ! fakesink - * and drop some buffer between rtprtxsend and rtprtxreceive - * Then it checks that every dropped packet has been re-sent and it checks that - * not too much requests has been sent. - */ -GST_START_TEST (test_drop_one_sender) -{ - GstElement *bin, *src, *rtppayloader, *rtprtxsend, *rtprtxreceive, *sink; - GstBus *bus; - gboolean res; - GstPad *srcpad, *sinkpad; - GstStreamConsistency *chk_1, *chk_2, *chk_3; - gint num_buffers = 20; - guint drop_every_n_packets = 0; - RTXSendData send_rtxdata; - RTXReceiveData receive_rtxdata; - gboolean eos = FALSE; - - GST_INFO ("preparing test"); - - /* build pipeline */ - bin = gst_pipeline_new ("pipeline"); - bus = gst_element_get_bus (bin); - gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH); - - src = gst_element_factory_make ("videotestsrc", "src"); - g_object_set (src, "num-buffers", num_buffers, NULL); - rtppayloader = gst_element_factory_make ("rtpvrawpay", "rtppayloader"); - rtprtxsend = gst_element_factory_make ("rtprtxsend", "rtprtxsend"); - rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive"); - sink = gst_element_factory_make ("fakesink", "sink"); - gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, rtprtxreceive, - sink, NULL); - - res = gst_element_link (src, rtppayloader); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtppayloader, rtprtxsend); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtprtxsend, rtprtxreceive); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtprtxreceive, sink); - fail_unless (res == TRUE, NULL); - - /* create consistency checkers for the pads */ - - srcpad = gst_element_get_static_pad (rtppayloader, "src"); - chk_1 = gst_consistency_checker_new (srcpad); - gst_object_unref (srcpad); - - srcpad = gst_element_get_static_pad (rtprtxsend, "src"); - gst_pad_add_probe (srcpad, - (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST | - GST_PAD_PROBE_TYPE_PUSH), - (GstPadProbeCallback) rtprtxsend_srcpad_probe, &send_rtxdata, NULL); - sinkpad = gst_pad_get_peer (srcpad); - fail_if (sinkpad == NULL); - chk_2 = gst_consistency_checker_new (sinkpad); - gst_object_unref (sinkpad); - gst_object_unref (srcpad); - - srcpad = gst_element_get_static_pad (rtprtxreceive, "src"); - gst_pad_add_probe (srcpad, - (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), - (GstPadProbeCallback) rtprtxreceive_srcpad_probe, &receive_rtxdata, NULL); - sinkpad = gst_pad_get_peer (srcpad); - fail_if (sinkpad == NULL); - chk_3 = gst_consistency_checker_new (sinkpad); - gst_object_unref (sinkpad); - gst_object_unref (srcpad); - - g_signal_connect (bus, "message::error", (GCallback) message_received, NULL); - g_signal_connect (bus, "message::warning", (GCallback) message_received, - NULL); - g_signal_connect (bus, "message::eos", (GCallback) message_received, &eos); - - for (drop_every_n_packets = 2; drop_every_n_packets < 10; - drop_every_n_packets++) { - start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend, - rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets, - &eos); + /* Check RTX stats */ + { + guint rtx_requests; + guint rtx_packets; + guint rtx_assoc_packets; + g_object_get (G_OBJECT (hsend->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, NULL); + fail_unless_equals_int (rtx_packets, expected_rtx_packets); + fail_unless_equals_int (rtx_requests, expected_rtx_packets); + + g_object_get (G_OBJECT (hrecv->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, + "num-rtx-assoc-packets", &rtx_assoc_packets, NULL); + fail_unless_equals_int (rtx_packets, expected_rtx_packets); + fail_unless_equals_int (rtx_requests, expected_rtx_packets); + fail_unless_equals_int (rtx_assoc_packets, expected_rtx_packets); } - /* cleanup */ - gst_consistency_checker_free (chk_1); - gst_consistency_checker_free (chk_2); - gst_consistency_checker_free (chk_3); - gst_bus_remove_signal_watch (bus); - gst_object_unref (bus); - gst_object_unref (bin); + gst_structure_free (pt_map); + gst_harness_teardown (hrecv); + gst_harness_teardown (hsend); } GST_END_TEST; -static void -message_received_multiple (GstBus * bus, GstMessage * message, gpointer data) -{ - GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, - GST_MESSAGE_SRC (message), message); - - switch (message->type) { - case GST_MESSAGE_WARNING:{ - GError *gerror; - gchar *debug; - - gst_message_parse_warning (message, &gerror, &debug); - gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); - g_error_free (gerror); - g_free (debug); - break; - } - case GST_MESSAGE_ERROR:{ - GError *gerror; - gchar *debug; - - gst_message_parse_error (message, &gerror, &debug); - gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); - fail ("Error: %s / %s", gerror->message, debug); - g_error_free (gerror); - g_free (debug); - break; - } - default: - break; - } -} - typedef struct { - guint count; - guint nb_packets; - guint drop_every_n_packets; - guint payload_type_master; - guint total_packets; -} RTXSendMultipleData; - -/* drop some packets */ -static GstPadProbeReturn -rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) -{ - GstPadProbeReturn ret = GST_PAD_PROBE_OK; - - if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { - GstBuffer *buffer = GST_BUFFER (info->data); - RTXSendMultipleData *rtxdata = (RTXSendMultipleData *) user_data; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - guint payload_type = 0; - - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - payload_type = gst_rtp_buffer_get_payload_type (&rtp); - - /* main stream packets */ - if (payload_type == rtxdata->payload_type_master) { - /* count packets of the main stream */ - ++rtxdata->nb_packets; - /* drop some packets */ - /* but make sure we never drop the last one, otherwise there - * will be nothing to trigger a retransmission. - */ - if (rtxdata->count < rtxdata->drop_every_n_packets || - rtxdata->nb_packets == rtxdata->total_packets) { - ++rtxdata->count; - } else { - /* drop a packet every 'rtxdata->count' packets */ - rtxdata->count = 1; - ret = GST_PAD_PROBE_DROP; - } - } else { - /* retransmission packets */ - } - - gst_rtp_buffer_unmap (&rtp); + GstHarness *h; + guint master_ssrc; + guint master_pt; + guint rtx_ssrc; + guint rtx_pt; + guint seqnum; + guint expected_rtx_packets; +} RtxSender; + +static GstStructure * +create_rtxsenders (RtxSender * senders, guint senders_num) +{ + GstStructure *recv_pt_map = + gst_structure_new_empty ("application/x-rtp-pt-map"); + + for (gint i = 0; i < senders_num; ++i) { + gchar *master_pt_str; + gchar *master_caps_str; + GstStructure *send_pt_map; + + senders[i].h = gst_harness_new ("rtprtxsend"); + senders[i].master_ssrc = 1234567 + i; + senders[i].rtx_ssrc = 7654321 + i; + senders[i].master_pt = 80 + i; + senders[i].rtx_pt = 20 + i; + senders[i].seqnum = i * 1000; + senders[i].expected_rtx_packets = 0; + + master_pt_str = g_strdup_printf ("%u", senders[i].master_pt); + master_caps_str = g_strdup_printf ("application/x-rtp, " + "media = (string)video, payload = (int)%u, " + "ssrc = (uint)%u, clock-rate = (int)90000, " + "encoding-name = (string)RAW", + senders[i].master_pt, senders[i].master_ssrc); + + send_pt_map = gst_structure_new ("application/x-rtp-pt-map", + master_pt_str, G_TYPE_UINT, senders[i].rtx_pt, NULL); + gst_structure_set (recv_pt_map, + master_pt_str, G_TYPE_UINT, senders[i].rtx_pt, NULL); + + g_object_set (senders[i].h->element, "payload-type-map", send_pt_map, NULL); + gst_harness_set_src_caps_str (senders[i].h, master_caps_str); + + gst_structure_free (send_pt_map); + g_free (master_pt_str); + g_free (master_caps_str); } - - return ret; + return recv_pt_map; } -/* make sure every sources has sent all their buffers */ -static GstPadProbeReturn -source_srcpad_probe_multiple_drop_eos (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) +static guint +check_rtxsenders_stats_and_teardown (RtxSender * senders, guint senders_num) { - GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); + guint total_pakets_num = 0; + for (gint i = 0; i < senders_num; ++i) { + guint rtx_requests; + guint rtx_packets; + g_object_get (G_OBJECT (senders[i].h->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, NULL); + fail_unless_equals_int (rtx_packets, senders[i].expected_rtx_packets); + fail_unless_equals_int (rtx_requests, senders[i].expected_rtx_packets); + total_pakets_num += rtx_packets; - if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) - return GST_PAD_PROBE_DROP; - else - return GST_PAD_PROBE_OK; + gst_harness_teardown (senders[i].h); + } + return total_pakets_num; } -typedef struct +GST_START_TEST (test_multi_rtxsend_rtxreceive_with_packet_loss) { - GHashTable *ssrc_to_nb_packets_map; - GHashTable *ssrc_to_seqnum_offset_map; - guint seqnum_offset; - - gint to_send; - volatile gint dropped_requests; - volatile gint received; - gboolean request_passed; -} RTXReceiveMultipleData; - -/* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */ -static RTXSendMultipleData * -add_sender (GstElement * bin, const gchar * src_name, - const gchar * payloader_name, guint payload_type_master, - guint payload_type_aux, RTXReceiveMultipleData * rtxdata) -{ - GstElement *src = NULL; - GstCaps *caps; - GstElement *rtppayloader = NULL; - GstElement *rtprtxsend = NULL; - GstElement *queue = NULL; - GstElement *funnel = NULL; - GstPad *srcpad = NULL; - gboolean res = FALSE; - RTXSendMultipleData *send_rtxdata = g_slice_new0 (RTXSendMultipleData); - gchar *pt_master; + guint senders_num = 5; + guint packets_num = 10; + guint total_pakets_num = senders_num * packets_num; + guint total_dropped_packets = 0; + RtxSender senders[5]; GstStructure *pt_map; + GstHarness *hrecv = gst_harness_new ("rtprtxreceive"); - send_rtxdata->count = 1; - send_rtxdata->nb_packets = 0; - send_rtxdata->drop_every_n_packets = 0; - send_rtxdata->payload_type_master = payload_type_master; - send_rtxdata->total_packets = 25; - rtxdata->to_send += send_rtxdata->total_packets; - - src = gst_element_factory_make (src_name, NULL); - rtppayloader = gst_element_factory_make (payloader_name, NULL); - rtprtxsend = gst_element_factory_make ("rtprtxsend", NULL); - queue = gst_element_factory_make ("queue", NULL); - funnel = gst_bin_get_by_name (GST_BIN (bin), "funnel"); - - pt_master = g_strdup_printf ("%" G_GUINT32_FORMAT, payload_type_master); - pt_map = gst_structure_new ("application/x-rtp-pt-map", - pt_master, G_TYPE_UINT, payload_type_aux, NULL); - g_free (pt_master); - - g_object_set (src, "num-buffers", send_rtxdata->total_packets, NULL); - g_object_set (src, "is-live", TRUE, NULL); - g_object_set (rtppayloader, "pt", payload_type_master, NULL); - g_object_set (rtppayloader, "seqnum-offset", 1, NULL); - g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL); - /* we want that every drop packet be resent fast */ - g_object_set (queue, "max-size-buffers", 1, NULL); - g_object_set (queue, "flush-on-eos", FALSE, NULL); - - gst_structure_free (pt_map); - - gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL); - - /* Make sure we have one buffer per frame, makes it easier to count! */ - caps = - gst_caps_from_string ("video/x-raw, width=20, height=10, framerate=30/1"); - res = gst_element_link_filtered (src, rtppayloader, caps); - gst_caps_unref (caps); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtppayloader, rtprtxsend); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtprtxsend, queue); - fail_unless (res == TRUE, NULL); - res = gst_element_link (queue, funnel); - fail_unless (res == TRUE, NULL); - gst_object_unref (funnel); - - /* to drop some packets */ - srcpad = gst_element_get_static_pad (rtprtxsend, "src"); - gst_pad_add_probe (srcpad, - (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), - (GstPadProbeCallback) rtprtxsend_srcpad_probe_multiple, send_rtxdata, - NULL); - gst_object_unref (srcpad); - - /* to make sure every sources has sent all their buffers */ - srcpad = gst_element_get_static_pad (src, "src"); - gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, - (GstPadProbeCallback) source_srcpad_probe_multiple_drop_eos, NULL, NULL); - gst_object_unref (srcpad); - - return send_rtxdata; -} - -static GstPadProbeReturn -rtprtxreceive_sinkpad_probe_check_drop (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) -{ - GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); - RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data; - - if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM && - gst_event_get_structure (event) != NULL && - gst_structure_has_name (gst_event_get_structure (event), - "GstRTPRetransmissionRequest")) - rtxdata->request_passed = TRUE; - - return GST_PAD_PROBE_OK; -} - -static gboolean -check_finished (RTXReceiveMultipleData * rtxdata) -{ - return (g_atomic_int_get (&rtxdata->received) >= (rtxdata->to_send - - g_atomic_int_get (&rtxdata->dropped_requests))); -} - -static GstPadProbeReturn -rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, - gpointer user_data) -{ - if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) { - GstBuffer *buffer = GST_BUFFER (info->data); - RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - guint ssrc = 0; - guint seqnum = 0; - gpointer seqnum_prev = 0; - guint nb_packets = 0; - - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - ssrc = gst_rtp_buffer_get_ssrc (&rtp); - seqnum = gst_rtp_buffer_get_seq (&rtp); - - g_atomic_int_inc (&rtxdata->received); - if (check_finished (rtxdata)) - g_main_context_wakeup (NULL); - - if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map, - GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) { - /*In our test we take care to never drop the first buffer */ - g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, - GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); - g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map, - GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (1)); - gst_rtp_buffer_unmap (&rtp); - return GST_PAD_PROBE_OK; - } - + pt_map = create_rtxsenders (senders, 5); + g_object_set (hrecv->element, "payload-type-map", pt_map, NULL); + gst_harness_set_src_caps_str (hrecv, "application/x-rtp, " + "media = (string)video, payload = (int)80, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); - /* check if there is a dropped packet - * (in our test every packet arrived in increasing order) */ - if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) { - GstPad *peerpad = gst_pad_get_peer (pad); - guint i = 0; - - /* ask retransmission of missing packets */ - for (i = GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset; - i < seqnum; i += rtxdata->seqnum_offset) { - GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstRTPRetransmissionRequest", - "seqnum", G_TYPE_UINT, i, - "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp), - "payload-type", G_TYPE_UINT, - gst_rtp_buffer_get_payload_type (&rtp), - NULL)); - rtxdata->request_passed = FALSE; - gst_pad_push_event (peerpad, event); - if (!rtxdata->request_passed) { - g_atomic_int_inc (&rtxdata->dropped_requests); - if (check_finished (rtxdata)) - g_main_context_wakeup (NULL); - } + /* Getting rid of reconfigure event. Making sure there is no upstream + events in the queue. Preparation step before the test. */ + gst_event_unref (gst_harness_pull_upstream_event (hrecv)); + fail_unless_equals_int (gst_harness_upstream_events_in_queue (hrecv), 0); + + /* We are going to push the 1st packet from the 1st sender, 2nd from the 2nd, + 3rd from the 3rd, etc. until all the senders will push 'packets_num' packets. + We will drop every 'drop_nth_packet' packet and request its retransmission + from all the senders. Because only one of them can produce RTX packet. + We need to make sure that all other senders will ignore the RTX event they + can't act upon. + */ + for (gint drop_nth_packet = 2; drop_nth_packet < 5; ++drop_nth_packet) { + for (gint i = 0; i < total_pakets_num; ++i) { + RtxSender *sender = &senders[i % senders_num]; + gboolean drop_this_packet = ((i + 1) % drop_nth_packet) == 0; + GstBuffer *outbuf, *inbuf; + inbuf = + create_rtp_buffer (sender->master_ssrc, sender->master_pt, + sender->seqnum); + + gst_harness_push (sender->h, gst_buffer_ref (inbuf)); + if (drop_this_packet) { + GstEvent *rtxevent; + /* Dropping original packet */ + gst_buffer_unref (gst_harness_pull (sender->h)); + + /* Pushing RTX event through rtxreceive to all the senders */ + gst_harness_push_upstream_event (hrecv, + create_rtx_event (sender->master_ssrc, sender->master_pt, + sender->seqnum)); + rtxevent = gst_harness_pull_upstream_event (hrecv); + + /* ... to all the senders */ + for (gint j = 0; j < senders_num; ++j) + gst_harness_push_upstream_event (senders[j].h, + gst_event_ref (rtxevent)); + gst_event_unref (rtxevent); + + /* Pushing RTX packet to rtxreceive */ + gst_harness_push (hrecv, gst_harness_pull (sender->h)); + ++sender->expected_rtx_packets; + ++total_dropped_packets; + } else { + gst_harness_push (hrecv, gst_harness_pull (sender->h)); } - gst_object_unref (peerpad); - - g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, - GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); - } else if (seqnum == - GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) { - /* also update previous seqnum in this case */ - g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map, - GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum)); - } else { - /* receive retransmited packet */ - } - - gst_rtp_buffer_unmap (&rtp); - nb_packets = - GPOINTER_TO_UINT (g_hash_table_lookup (rtxdata->ssrc_to_nb_packets_map, - GUINT_TO_POINTER (ssrc))); - g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map, - GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (++nb_packets)); - } - - return GST_PAD_PROBE_OK; -} - -static void -reset_rtx_send_data (RTXSendMultipleData * send_rtxdata, gpointer data) -{ - send_rtxdata->count = 1; - send_rtxdata->nb_packets = 0; - send_rtxdata->drop_every_n_packets = *(guint *) data; -} - -/* compute number of all packets sent by all sender */ -static void -compute_total_packets_sent (RTXSendMultipleData * send_rtxdata, gpointer data) -{ - guint *sum = (guint *) data; - *sum += send_rtxdata->nb_packets; -} - -/* compute number of all packets received by rtprtxreceive::src pad */ -static void -compute_total_packets_received (gpointer key, gpointer value, gpointer data) -{ - guint *sum = (guint *) data; - *sum += GPOINTER_TO_UINT (value); -} + /* It should not matter whether the buffer was dropped (and retransmitted) + or it went straight through rtxsend to rtxreceive. We should always pull + the same buffer that was pushed */ + outbuf = gst_harness_pull (hrecv); + compare_rtp_packets (inbuf, outbuf); + gst_buffer_unref (inbuf); + gst_buffer_unref (outbuf); + + /* + We should not have any packets in the harness queue by this point. It + means our senders didn't produce the packets for the unknown RTX event. + */ + for (gint j = 0; j < senders_num; ++j) + fail_unless_equals_int (gst_harness_buffers_in_queue (senders[j].h), 0); -static void -start_test_drop_multiple_and_check_results (GstElement * bin, - GList * send_rtxdata_list, RTXReceiveMultipleData * receive_rtxdata, - guint drop_every_n_packets) -{ - GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; - GstElement *rtprtxreceive = - gst_bin_get_by_name (GST_BIN (bin), "rtprtxreceive"); - guint sum_all_packets_sent = 0; - guint sum_rtx_packets_sent = 0; - guint sum_all_packets_received = 0; - guint sum_rtx_packets_received = 0; - guint sum_rtx_assoc_packets_received = 0; - guint sum_rtx_dropped_packets_received = 0; - gdouble error_sent_recv = 0; - GstIterator *itr_elements = NULL; - gboolean done = FALSE; - GValue item = { 0 }; - GstElement *element = NULL; - gchar *name = NULL; - - GST_INFO ("starting test"); - - g_atomic_int_set (&receive_rtxdata->received, 0); - g_atomic_int_set (&receive_rtxdata->dropped_requests, 0); - - g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map); - g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map); - - g_list_foreach (send_rtxdata_list, (GFunc) reset_rtx_send_data, - &drop_every_n_packets); - - /* run pipeline */ - state_res = gst_element_set_state (bin, GST_STATE_PLAYING); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); - - state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); - - GST_INFO ("running main loop"); - while (!check_finished (receive_rtxdata)) - g_main_context_iteration (NULL, TRUE); - - /* check results */ - itr_elements = gst_bin_iterate_elements (GST_BIN (bin)); - done = FALSE; - while (!done) { - switch (gst_iterator_next (itr_elements, &item)) { - case GST_ITERATOR_OK: - element = GST_ELEMENT (g_value_get_object (&item)); - name = gst_element_get_name (element); - if (g_str_has_prefix (name, "rtprtxsend") > 0) { - guint nb_packets = 0; - g_object_get (G_OBJECT (element), "num-rtx-packets", &nb_packets, - NULL); - sum_rtx_packets_sent += nb_packets; - } - g_free (name); - g_value_reset (&item); - break; - case GST_ITERATOR_RESYNC: - gst_iterator_resync (itr_elements); - break; - case GST_ITERATOR_ERROR: - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; + ++sender->seqnum; } } - g_value_unset (&item); - gst_iterator_free (itr_elements); - - /* compute number of all packets sent by all sender */ - g_list_foreach (send_rtxdata_list, (GFunc) compute_total_packets_sent, - &sum_all_packets_sent); - - /* compute number of all packets received by rtprtxreceive::src pad */ - g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map, - compute_total_packets_received, (gpointer) & sum_all_packets_received); - sum_all_packets_received += - g_atomic_int_get (&receive_rtxdata->dropped_requests); - fail_if (sum_all_packets_sent < sum_all_packets_received); - - /* some packet are not received, I still have to figure out why - * but I suspect it comes from pipeline setup/shutdown - */ - if (sum_all_packets_sent != sum_all_packets_received) { - error_sent_recv = - 1 - sum_all_packets_received / (gdouble) sum_all_packets_sent; - fail_if (error_sent_recv > 0.30); - /* it should be 0% */ + /* Check RTX stats */ + { + guint total_rtx_packets; + guint rtx_requests; + guint rtx_packets; + guint rtx_assoc_packets; + + total_rtx_packets = + check_rtxsenders_stats_and_teardown (senders, senders_num); + fail_unless_equals_int (total_rtx_packets, total_dropped_packets); + + g_object_get (G_OBJECT (hrecv->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, + "num-rtx-assoc-packets", &rtx_assoc_packets, NULL); + fail_unless_equals_int (rtx_packets, total_rtx_packets); + fail_unless_equals_int (rtx_requests, total_rtx_packets); + fail_unless_equals_int (rtx_assoc_packets, total_rtx_packets); } - /* retrieve number of retransmit packets received by rtprtxreceive */ - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", - &sum_rtx_packets_received, NULL); - - /* some of rtx packet are not received because the receiver avoids - * collision (= requests that have the same seqnum) - */ - fail_if (sum_rtx_packets_sent < sum_rtx_packets_received); - g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets", - &sum_rtx_assoc_packets_received, NULL); - sum_rtx_dropped_packets_received = - sum_rtx_packets_received - sum_rtx_assoc_packets_received; - fail_unless_equals_int (sum_rtx_packets_sent, - sum_rtx_assoc_packets_received + sum_rtx_dropped_packets_received); - - gst_object_unref (rtprtxreceive); - state_res = gst_element_set_state (bin, GST_STATE_NULL); - ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); -} - -static void -free_rtx_send_data (gpointer data) -{ - g_slice_free (RTXSendMultipleData, data); -} - -/* This test build the pipeline funnel name=funnel - * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. - * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. - * N - * funnel. ! rtprtxreceive ! fakesink - * and drop some buffer just after each rtprtxsend - * Then it checks that every dropped packet has been re-sent and it checks - * that not too much requests has been sent. - */ -GST_START_TEST (test_drop_multiple_sender) -{ - GstElement *bin, *funnel, *rtprtxreceive, *sink; - GstBus *bus; - gboolean res; - GstPad *srcpad, *sinkpad; - guint drop_every_n_packets = 0; - GList *send_rtxdata_list = NULL; - RTXReceiveMultipleData receive_rtxdata = { NULL }; - GstStructure *pt_map; - - GST_INFO ("preparing test"); - - receive_rtxdata.ssrc_to_nb_packets_map = - g_hash_table_new (g_direct_hash, g_direct_equal); - receive_rtxdata.ssrc_to_seqnum_offset_map = - g_hash_table_new (g_direct_hash, g_direct_equal); - receive_rtxdata.seqnum_offset = 1; - - /* build pipeline */ - bin = gst_pipeline_new ("pipeline"); - bus = gst_element_get_bus (bin); - gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH); - - funnel = gst_element_factory_make ("funnel", "funnel"); - rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive"); - sink = gst_element_factory_make ("fakesink", "sink"); - g_object_set (sink, "sync", TRUE, NULL); - g_object_set (sink, "qos", FALSE, NULL); - gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL); - - send_rtxdata_list = - g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 96, 121, &receive_rtxdata)); - send_rtxdata_list = - g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 97, 122, &receive_rtxdata)); - send_rtxdata_list = - g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 98, 123, &receive_rtxdata)); - send_rtxdata_list = - g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 99, 124, &receive_rtxdata)); - - pt_map = gst_structure_new ("application/x-rtp-pt-map", - "96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122, - "98", G_TYPE_UINT, 123, "99", G_TYPE_UINT, 124, NULL); - g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL); gst_structure_free (pt_map); - - res = gst_element_link (funnel, rtprtxreceive); - fail_unless (res == TRUE, NULL); - res = gst_element_link (rtprtxreceive, sink); - fail_unless (res == TRUE, NULL); - - srcpad = gst_element_get_static_pad (rtprtxreceive, "src"); - gst_pad_add_probe (srcpad, - (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH), - (GstPadProbeCallback) rtprtxreceive_srcpad_probe_multiple, - &receive_rtxdata, NULL); - gst_object_unref (srcpad); - - sinkpad = gst_element_get_static_pad (rtprtxreceive, "sink"); - gst_pad_add_probe (sinkpad, - GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, - (GstPadProbeCallback) rtprtxreceive_sinkpad_probe_check_drop, - &receive_rtxdata, NULL); - gst_object_unref (sinkpad); - - g_signal_connect (bus, "message::error", - (GCallback) message_received_multiple, NULL); - g_signal_connect (bus, "message::warning", - (GCallback) message_received_multiple, NULL); - - for (drop_every_n_packets = 2; drop_every_n_packets < 10; - drop_every_n_packets++) { - start_test_drop_multiple_and_check_results (bin, send_rtxdata_list, - &receive_rtxdata, drop_every_n_packets); - } - - /* cleanup */ - - g_list_free_full (send_rtxdata_list, free_rtx_send_data); - g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map); - g_hash_table_destroy (receive_rtxdata.ssrc_to_seqnum_offset_map); - - gst_bus_remove_signal_watch (bus); - gst_object_unref (bus); - gst_object_unref (bin); + gst_harness_teardown (hrecv); } GST_END_TEST; -struct GenerateTestBuffersData -{ - GstElement *src, *capsfilter, *payloader, *sink; - GMutex mutex; - GCond cond; - GList *buffers; - gint num_buffers; - guint last_seqnum; -}; - -static void -fakesink_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad, - gpointer user_data) -{ - struct GenerateTestBuffersData *data = user_data; - - g_mutex_lock (&data->mutex); - - if (data->num_buffers > 0) - data->buffers = g_list_append (data->buffers, gst_buffer_ref (buf)); - - /* if we have collected enough buffers, unblock the main thread to stop */ - if (--data->num_buffers <= 0) - g_cond_signal (&data->cond); - - if (data->num_buffers == 0) - g_object_get (data->payloader, "seqnum", &data->last_seqnum, NULL); - - g_mutex_unlock (&data->mutex); -} - -static GList * -generate_test_buffers (const gint num_buffers, guint ssrc, guint * payload_type) -{ - GstElement *bin; - GstCaps *videotestsrc_caps; - gboolean res; - struct GenerateTestBuffersData data; - - fail_unless (num_buffers > 0); - - g_mutex_init (&data.mutex); - g_cond_init (&data.cond); - data.buffers = NULL; - data.num_buffers = num_buffers; - - bin = gst_pipeline_new (NULL); - data.src = gst_element_factory_make ("videotestsrc", NULL); - data.capsfilter = gst_element_factory_make ("capsfilter", NULL); - data.payloader = gst_element_factory_make ("rtpvrawpay", NULL); - data.sink = gst_element_factory_make ("fakesink", NULL); - - /* small frame size will cause vrawpay to generate exactly one rtp packet - * per video frame, which we need for the max-size-time test */ - videotestsrc_caps = - gst_caps_from_string - ("video/x-raw,format=I420,width=10,height=10,framerate=30/1"); - - g_object_set (data.src, "do-timestamp", TRUE, NULL); - g_object_set (data.capsfilter, "caps", videotestsrc_caps, NULL); - g_object_set (data.payloader, "seqnum-offset", 1, "ssrc", ssrc, NULL); - g_object_set (data.sink, "signal-handoffs", TRUE, NULL); - g_signal_connect (data.sink, "handoff", (GCallback) fakesink_handoff, &data); - - gst_caps_unref (videotestsrc_caps); - - gst_bin_add_many (GST_BIN (bin), data.src, data.capsfilter, data.payloader, - data.sink, NULL); - res = gst_element_link_many (data.src, data.capsfilter, data.payloader, - data.sink, NULL); - fail_unless_equals_int (res, TRUE); - - g_mutex_lock (&data.mutex); - ASSERT_SET_STATE (bin, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); - while (data.num_buffers > 0) - g_cond_wait (&data.cond, &data.mutex); - g_mutex_unlock (&data.mutex); - - g_object_get (data.payloader, "pt", payload_type, NULL); - - ASSERT_SET_STATE (bin, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); - - fail_unless_equals_int (g_list_length (data.buffers), num_buffers); - fail_unless_equals_int (num_buffers, data.last_seqnum); - - g_mutex_clear (&data.mutex); - g_cond_clear (&data.cond); - gst_object_unref (bin); - - return data.buffers; -} - -static GstEvent * -create_rtx_event (guint seqnum, guint ssrc, guint payload_type) -{ - return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstRTPRetransmissionRequest", - "seqnum", G_TYPE_UINT, seqnum, - "ssrc", G_TYPE_UINT, ssrc, - "payload-type", G_TYPE_UINT, payload_type, NULL)); -} - static void test_rtxsender_packet_retention (gboolean test_with_time) { - const gint num_buffers = test_with_time ? 30 : 10; - const gint half_buffers = num_buffers / 2; - const guint ssrc = 1234567; - const guint rtx_ssrc = 7654321; - const guint rtx_payload_type = 99; - GstStructure *pt_map; - GstStructure *ssrc_map; - GList *in_buffers, *node; - guint payload_type; - GstElement *rtxsend; - GstPad *srcpad, *sinkpad; - GstCaps *caps; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; - gint i, j; - gboolean res; - - /* generate test data */ - in_buffers = generate_test_buffers (num_buffers, ssrc, &payload_type); - - /* clear the global buffers list, which we are going to use later */ - gst_check_drop_buffers (); - - /* setup element & pads */ - rtxsend = gst_check_setup_element ("rtprtxsend"); - - pt_map = gst_structure_new ("application/x-rtp-pt-map", - "96", G_TYPE_UINT, rtx_payload_type, NULL); - ssrc_map = gst_structure_new ("application/x-rtp-ssrc-map", + guint master_ssrc = 1234567; + guint master_pt = 96; + guint rtx_ssrc = 7654321; + guint rtx_pt = 99; + gint num_buffers = test_with_time ? 30 : 10; + gint half_buffers = num_buffers / 2; + guint timestamp_delta = 90000 / 30; + guint timestamp = G_MAXUINT32 - half_buffers * timestamp_delta; + GstHarness *h; + GstStructure *pt_map = gst_structure_new ("application/x-rtp-pt-map", + "96", G_TYPE_UINT, rtx_pt, NULL); + GstStructure *ssrc_map = gst_structure_new ("application/x-rtp-ssrc-map", "1234567", G_TYPE_UINT, rtx_ssrc, NULL); - /* in both cases we want the rtxsend queue to store 'half_buffers' - * amount of buffers at most. In max-size-packets mode, it's trivial. - * In max-size-time mode, we specify almost half a second, which is - * the equivalent of 15 frames in a 30fps video stream */ - g_object_set (rtxsend, + h = gst_harness_new ("rtprtxsend"); + + /* In both cases we want the rtxsend queue to store 'half_buffers' + amount of buffers at most. In max-size-packets mode, it's trivial. + In max-size-time mode, we specify almost half a second, which is + the equivalent of 15 frames in a 30fps video stream. + */ + g_object_set (h->element, "max-size-packets", test_with_time ? 0 : half_buffers, "max-size-time", test_with_time ? 499 : 0, "payload-type-map", pt_map, "ssrc-map", ssrc_map, NULL); - gst_structure_free (pt_map); - gst_structure_free (ssrc_map); - - srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate); - fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE); - - sinkpad = gst_check_setup_sink_pad (rtxsend, &sinktemplate); - fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE); - ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); - - caps = gst_caps_from_string ("application/x-rtp, " + gst_harness_set_src_caps_str (h, "application/x-rtp, " "media = (string)video, payload = (int)96, " "ssrc = (uint)1234567, clock-rate = (int)90000, " "encoding-name = (string)RAW"); - gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME); - gst_caps_unref (caps); - - /* now push all buffers and request retransmission every time for all of them */ - node = in_buffers; - for (i = 1; i <= num_buffers; i++) { - GstBuffer *buffer = GST_BUFFER (node->data); - - /* verify that the original packets are correct */ - res = gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - fail_unless_equals_int (res, TRUE); - fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc); - fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), - payload_type); - fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i); - gst_rtp_buffer_unmap (&rtp); - - /* retransmit all the previous ones */ - for (j = 1; j < i; j++) { - /* synchronize with the chain() function of the "sinkpad" - * to make sure that rtxsend has pushed the rtx buffer out - * before continuing */ - GList *last_out_buffer = g_list_last (buffers); - g_mutex_lock (&check_mutex); - fail_unless_equals_int (gst_pad_push_event (sinkpad, - create_rtx_event (j, ssrc, payload_type)), TRUE); - /* wait for the rtx packet only if we expect the element - * to actually retransmit something */ - if (j >= MAX (i - half_buffers, 1)) { - guint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND; - - while (last_out_buffer == g_list_last (buffers)) - fail_unless (g_cond_wait_until (&check_cond, &check_mutex, end_time)); - } - g_mutex_unlock (&check_mutex); - } - - /* push this one */ - gst_pad_push (srcpad, gst_buffer_ref (buffer)); - node = g_list_next (node); - } - /* verify the result. buffers should be in this order (numbers are seqnums): - * 1, 1rtx, 2, 1rtx, 2rtx, 3, ... , 9, 5rtx, 6rtx, 7rtx, 8rtx, 9rtx, 10 */ - { - GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT; - gint expected_rtx_requests, expected_rtx_packets; - gint real_rtx_requests, real_rtx_packets; - - /* verify statistics first */ - expected_rtx_packets = half_buffers * half_buffers + - ((half_buffers - 1) / 2.0f) * half_buffers; - for (i = 1, expected_rtx_requests = 0; i < num_buffers; i++) - expected_rtx_requests += i; - - g_object_get (rtxsend, "num-rtx-requests", &real_rtx_requests, - "num-rtx-packets", &real_rtx_packets, NULL); - fail_unless_equals_int (expected_rtx_requests, real_rtx_requests); - fail_unless_equals_int (expected_rtx_packets, real_rtx_packets); - - /* and the number of actual buffers that we were pushed out of rtxsend */ - fail_unless_equals_int (g_list_length (buffers), - num_buffers + expected_rtx_packets); - - node = buffers; - for (i = 1; i <= num_buffers; i++) { - /* verify the retransmission packets */ - for (j = MAX (i - half_buffers, 1); j < i; j++) { - GST_INFO ("checking %d, %d", i, j); - - res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp); - fail_unless_equals_int (res, TRUE); - - fail_if (gst_rtp_buffer_get_ssrc (&rtp) == ssrc); - fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), rtx_ssrc); - fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), - rtx_payload_type); - fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)), j); /* j == rtx seqnum */ - - /* open the original packet for this rtx packet and verify timestamps */ - res = gst_rtp_buffer_map (GST_BUFFER (g_list_nth_data (in_buffers, - j - 1)), GST_MAP_READ, &orig_rtp); - fail_unless_equals_int (res, TRUE); - fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&orig_rtp), - gst_rtp_buffer_get_timestamp (&rtp)); - gst_rtp_buffer_unmap (&orig_rtp); - - gst_rtp_buffer_unmap (&rtp); - node = g_list_next (node); - } - - /* verify the normal rtp flow packet */ - res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp); - fail_unless_equals_int (res, TRUE); - fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc); - fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), - payload_type); - fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i); - gst_rtp_buffer_unmap (&rtp); - node = g_list_next (node); + /* Now push all buffers and request retransmission every time for all of them */ + for (gint i = 0; i < num_buffers; ++i, timestamp += timestamp_delta) { + /* Request to retransmit all the previous ones */ + for (gint j = 0; j < i; ++j) { + guint rtx_seqnum = 0x100 + j; + gst_harness_push_upstream_event (h, + create_rtx_event (master_ssrc, master_pt, rtx_seqnum)); + + /* Pull only the ones supposed to be retransmited */ + if (j >= i - half_buffers) + pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, rtx_seqnum); } + /* Check there no extra buffers in the harness queue */ + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 0); + + /* We create RTP buffers with timestamps that will eventualy wrap around 0 + to be sure, rtprtxsend can handle it properly */ + push_pull_and_verify (h, + create_rtp_buffer_with_timestamp (master_ssrc, master_pt, 0x100 + i, + timestamp), FALSE, master_ssrc, master_pt, 0x100 + i); } - g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref); - gst_check_drop_buffers (); - - gst_check_teardown_src_pad (rtxsend); - gst_check_teardown_sink_pad (rtxsend); - gst_check_teardown_element (rtxsend); + gst_structure_free (pt_map); + gst_structure_free (ssrc_map); + gst_harness_teardown (h); } GST_START_TEST (test_rtxsender_max_size_packets) @@ -1450,132 +565,6 @@ GST_START_TEST (test_rtxsender_max_size_time) GST_END_TEST; -static void -compare_rtp_packets (GstBuffer * a, GstBuffer * b) -{ - GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT; - GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT; - - gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a); - gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b); - - fail_unless_equals_int (gst_rtp_buffer_get_header_len (&rtp_a), - gst_rtp_buffer_get_header_len (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_version (&rtp_a), - gst_rtp_buffer_get_version (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp_a), - gst_rtp_buffer_get_ssrc (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp_a), - gst_rtp_buffer_get_seq (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_csrc_count (&rtp_a), - gst_rtp_buffer_get_csrc_count (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_marker (&rtp_a), - gst_rtp_buffer_get_marker (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp_a), - gst_rtp_buffer_get_payload_type (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&rtp_a), - gst_rtp_buffer_get_timestamp (&rtp_b)); - fail_unless_equals_int (gst_rtp_buffer_get_extension (&rtp_a), - gst_rtp_buffer_get_extension (&rtp_b)); - - fail_unless_equals_int (gst_rtp_buffer_get_payload_len (&rtp_a), - gst_rtp_buffer_get_payload_len (&rtp_b)); - fail_unless_equals_int (memcmp (gst_rtp_buffer_get_payload (&rtp_a), - gst_rtp_buffer_get_payload (&rtp_b), - gst_rtp_buffer_get_payload_len (&rtp_a)), 0); - - gst_rtp_buffer_unmap (&rtp_a); - gst_rtp_buffer_unmap (&rtp_b); -} - -GST_START_TEST (test_rtxreceive_data_reconstruction) -{ - const guint ssrc = 1234567; - GList *in_buffers; - guint payload_type; - GstElement *rtxsend, *rtxrecv; - GstPad *srcpad, *sinkpad; - GstCaps *caps; - GstBuffer *buffer; - GstStructure *pt_map; - - /* generate test data */ - in_buffers = generate_test_buffers (1, ssrc, &payload_type); - - /* clear the global buffers list, which we are going to use later */ - gst_check_drop_buffers (); - - /* setup element & pads */ - rtxsend = gst_check_setup_element ("rtprtxsend"); - rtxrecv = gst_check_setup_element ("rtprtxreceive"); - - pt_map = gst_structure_new ("application/x-rtp-pt-map", - "96", G_TYPE_UINT, 99, NULL); - g_object_set (rtxsend, "payload-type-map", pt_map, NULL); - g_object_set (rtxrecv, "payload-type-map", pt_map, NULL); - gst_structure_free (pt_map); - - fail_unless_equals_int (gst_element_link (rtxsend, rtxrecv), TRUE); - - srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate); - fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE); - - sinkpad = gst_check_setup_sink_pad (rtxrecv, &sinktemplate); - fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE); - - ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); - ASSERT_SET_STATE (rtxrecv, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); - - caps = gst_caps_from_string ("application/x-rtp, " - "media = (string)video, payload = (int)96, " - "ssrc = (uint)1234567, clock-rate = (int)90000, " - "encoding-name = (string)RAW"); - gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME); - gst_caps_unref (caps); - - /* push buffer */ - buffer = gst_buffer_ref (GST_BUFFER (in_buffers->data)); - fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK); - - /* push retransmission request */ - { - GList *last_out_buffer; - guint64 end_time; - gboolean res; - - /* synchronize with the chain() function of the "sinkpad" - * to make sure that rtxsend has pushed the rtx buffer out - * before continuing */ - last_out_buffer = g_list_last (buffers); - g_mutex_lock (&check_mutex); - fail_unless_equals_int (gst_pad_push_event (sinkpad, - create_rtx_event (1, ssrc, payload_type)), TRUE); - end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND; - do - res = g_cond_wait_until (&check_cond, &check_mutex, end_time); - while (res == TRUE && last_out_buffer == g_list_last (buffers)); - fail_unless_equals_int (res, TRUE); - g_mutex_unlock (&check_mutex); - } - - /* verify */ - fail_unless_equals_int (g_list_length (buffers), 2); - compare_rtp_packets (GST_BUFFER (buffers->data), - GST_BUFFER (buffers->next->data)); - - /* cleanup */ - g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref); - gst_check_drop_buffers (); - - gst_check_teardown_src_pad (rtxsend); - gst_check_teardown_sink_pad (rtxrecv); - gst_element_unlink (rtxsend, rtxrecv); - gst_check_teardown_element (rtxsend); - gst_check_teardown_element (rtxrecv); -} - -GST_END_TEST; - static Suite * rtprtx_suite (void) { @@ -1586,12 +575,11 @@ rtprtx_suite (void) suite_add_tcase (s, tc_chain); - tcase_add_test (tc_chain, test_push_forward_seq); - tcase_add_test (tc_chain, test_drop_one_sender); - tcase_add_test (tc_chain, test_drop_multiple_sender); + tcase_add_test (tc_chain, test_rtxsend_rtxreceive); + tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss); + tcase_add_test (tc_chain, test_multi_rtxsend_rtxreceive_with_packet_loss); tcase_add_test (tc_chain, test_rtxsender_max_size_packets); tcase_add_test (tc_chain, test_rtxsender_max_size_time); - tcase_add_test (tc_chain, test_rtxreceive_data_reconstruction); return s; } -- 2.7.4