/* we also have a list of src buffers */
static GList *inbuffers = NULL;
-static GMainLoop *main_loop;
-
#define RTP_CAPS_STRING \
"application/x-rtp, " \
"media = (string)audio, " \
GST_END_TEST;
static void
-message_received (GstBus * bus, GstMessage * message, GstPipeline * bin)
+message_received (GstBus * bus, GstMessage * message, gboolean * eos)
{
GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
GST_MESSAGE_SRC (message), message);
switch (message->type) {
case GST_MESSAGE_EOS:
- g_main_loop_quit (main_loop);
+ *eos = TRUE;
break;
case GST_MESSAGE_WARNING:{
GError *gerror;
gst_message_parse_error (message, &gerror, &debug);
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+ fail ("Error: %s / %s", gerror->message, debug);
g_error_free (gerror);
g_free (debug);
- g_main_loop_quit (main_loop);
break;
}
default:
start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader,
GstElement * rtprtxsend, GstElement * rtprtxreceive,
RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata,
- guint drop_every_n_packets)
+ guint drop_every_n_packets, gboolean * eos)
{
GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE;
guint nbrtxrequests = 0;
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
GST_INFO ("running main loop");
- g_main_loop_run (main_loop);
+ while (!*eos)
+ g_main_context_iteration (NULL, TRUE);
/* check results */
guint drop_every_n_packets = 0;
RTXSendData send_rtxdata;
RTXReceiveData receive_rtxdata;
+ gboolean eos = FALSE;
GST_INFO ("preparing test");
gst_object_unref (sinkpad);
gst_object_unref (srcpad);
- main_loop = g_main_loop_new (NULL, FALSE);
- g_signal_connect (bus, "message::error", (GCallback) message_received, bin);
- g_signal_connect (bus, "message::warning", (GCallback) message_received, bin);
- g_signal_connect (bus, "message::eos", (GCallback) message_received, bin);
+ g_signal_connect (bus, "message::error", (GCallback) message_received, NULL);
+ g_signal_connect (bus, "message::warning", (GCallback) message_received,
+ NULL);
+ g_signal_connect (bus, "message::eos", (GCallback) message_received, &eos);
for (drop_every_n_packets = 2; drop_every_n_packets < 10;
drop_every_n_packets++) {
start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend,
- rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets);
+ rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets,
+ &eos);
}
/* cleanup */
- g_main_loop_unref (main_loop);
gst_consistency_checker_free (chk_1);
gst_consistency_checker_free (chk_2);
gst_consistency_checker_free (chk_3);
GST_END_TEST;
-GMutex lock_eos;
-static gint nb_sources;
-static gint nb_eos;
-
static void
-message_received_multiple (GstBus * bus, GstMessage * message,
- GstPipeline * bin)
+message_received_multiple (GstBus * bus, GstMessage * message, gpointer data)
{
GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
GST_MESSAGE_SRC (message), message);
switch (message->type) {
- case GST_MESSAGE_EOS:
- g_main_loop_quit (main_loop);
- break;
case GST_MESSAGE_WARNING:{
GError *gerror;
gchar *debug;
gst_message_parse_error (message, &gerror, &debug);
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+ fail ("Error: %s / %s", gerror->message, debug);
g_error_free (gerror);
g_free (debug);
- g_main_loop_quit (main_loop);
break;
}
default:
guint nb_packets;
guint drop_every_n_packets;
guint payload_type_master;
+ guint total_packets;
} RTXSendMultipleData;
/* drop some packets */
/* count packets of the main stream */
++rtxdata->nb_packets;
/* drop some packets */
- if (rtxdata->count < rtxdata->drop_every_n_packets) {
+ /* but make sure we never drop the last one, otherwise there
+ * will be nothing to trigger a retransmission.
+ */
+ if (rtxdata->count < rtxdata->drop_every_n_packets ||
+ rtxdata->nb_packets == rtxdata->total_packets) {
++rtxdata->count;
} else {
/* drop a packet every 'rtxdata->count' packets */
/* make sure every sources has sent all their buffers */
static GstPadProbeReturn
-source_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
+source_srcpad_probe_multiple_drop_eos (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data)
{
- GstPadProbeReturn ret = GST_PAD_PROBE_OK;
+ GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
- if (info->type ==
- (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH)) {
- GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
- if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
- g_mutex_lock (&lock_eos);
- ++nb_eos;
- if (nb_eos < nb_sources)
- ret = GST_PAD_PROBE_DROP;
- g_mutex_unlock (&lock_eos);
- }
- }
-
- return ret;
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS)
+ return GST_PAD_PROBE_DROP;
+ else
+ return GST_PAD_PROBE_OK;
}
+typedef struct
+{
+ GHashTable *ssrc_to_nb_packets_map;
+ GHashTable *ssrc_to_seqnum_offset_map;
+ guint seqnum_offset;
+
+ gint to_send;
+ volatile gint dropped_requests;
+ volatile gint received;
+ gboolean request_passed;
+} RTXReceiveMultipleData;
+
/* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */
static RTXSendMultipleData *
add_sender (GstElement * bin, const gchar * src_name,
const gchar * payloader_name, guint payload_type_master,
- guint payload_type_aux)
+ guint payload_type_aux, RTXReceiveMultipleData * rtxdata)
{
GstElement *src = NULL;
+ GstCaps *caps;
GstElement *rtppayloader = NULL;
GstElement *rtprtxsend = NULL;
GstElement *queue = NULL;
send_rtxdata->nb_packets = 0;
send_rtxdata->drop_every_n_packets = 0;
send_rtxdata->payload_type_master = payload_type_master;
+ send_rtxdata->total_packets = 25;
+ rtxdata->to_send += send_rtxdata->total_packets;
src = gst_element_factory_make (src_name, NULL);
rtppayloader = gst_element_factory_make (payloader_name, NULL);
pt_master, G_TYPE_UINT, payload_type_aux, NULL);
g_free (pt_master);
- g_object_set (src, "num-buffers", 25, NULL);
+ g_object_set (src, "num-buffers", send_rtxdata->total_packets, NULL);
+ g_object_set (src, "is-live", TRUE, NULL);
g_object_set (rtppayloader, "pt", payload_type_master, NULL);
g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL);
- res = gst_element_link (src, rtppayloader);
+ /* Make sure we have one buffer per frame, makes it easier to count! */
+ caps =
+ gst_caps_from_string ("video/x-raw, width=20, height=10, framerate=30/1");
+ res = gst_element_link_filtered (src, rtppayloader, caps);
+ gst_caps_unref (caps);
fail_unless (res == TRUE, NULL);
res = gst_element_link (rtppayloader, rtprtxsend);
fail_unless (res == TRUE, NULL);
gst_object_unref (srcpad);
/* to make sure every sources has sent all their buffers */
- srcpad = gst_element_get_static_pad (queue, "src");
- gst_pad_add_probe (srcpad,
- (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH),
- (GstPadProbeCallback) source_srcpad_probe_multiple, NULL, NULL);
+ srcpad = gst_element_get_static_pad (src, "src");
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ (GstPadProbeCallback) source_srcpad_probe_multiple_drop_eos, NULL, NULL);
gst_object_unref (srcpad);
- ++nb_sources;
-
return send_rtxdata;
}
-typedef struct
+static GstPadProbeReturn
+rtprtxreceive_sinkpad_probe_check_drop (GstPad * pad, GstPadProbeInfo * info,
+ gpointer user_data)
{
- GHashTable *ssrc_to_nb_packets_map;
- GHashTable *ssrc_to_seqnum_offset_map;
- guint seqnum_offset;
-} RTXReceiveMultipleData;
+ GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
+ RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data;
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM &&
+ gst_event_get_structure (event) != NULL &&
+ gst_structure_has_name (gst_event_get_structure (event),
+ "GstRTPRetransmissionRequest"))
+ rtxdata->request_passed = TRUE;
+
+ return GST_PAD_PROBE_OK;
+}
+
+static gboolean
+check_finished (RTXReceiveMultipleData * rtxdata)
+{
+ return (g_atomic_int_get (&rtxdata->received) >= (rtxdata->to_send -
+ g_atomic_int_get (&rtxdata->dropped_requests)));
+}
static GstPadProbeReturn
rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
+ g_atomic_int_inc (&rtxdata->received);
+ if (check_finished (rtxdata))
+ g_main_context_wakeup (NULL);
+
if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map,
GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) {
/*In our test we take care to never drop the first buffer */
return GST_PAD_PROBE_OK;
}
+
/* check if there is a dropped packet
* (in our test every packet arrived in increasing order) */
if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) {
"payload-type", G_TYPE_UINT,
gst_rtp_buffer_get_payload_type (&rtp),
NULL));
+ rtxdata->request_passed = FALSE;
gst_pad_push_event (peerpad, event);
+ if (!rtxdata->request_passed) {
+ g_atomic_int_inc (&rtxdata->dropped_requests);
+ if (check_finished (rtxdata))
+ g_main_context_wakeup (NULL);
+ }
}
gst_object_unref (peerpad);
GST_INFO ("starting test");
+ g_atomic_int_set (&receive_rtxdata->received, 0);
+ g_atomic_int_set (&receive_rtxdata->dropped_requests, 0);
+
g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map);
g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map);
ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
GST_INFO ("running main loop");
- g_main_loop_run (main_loop);
+ while (!check_finished (receive_rtxdata))
+ g_main_context_iteration (NULL, TRUE);
/* check results */
itr_elements = gst_bin_iterate_elements (GST_BIN (bin));
g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map,
compute_total_packets_received, (gpointer) & sum_all_packets_received);
- /* check that we sent as many packets as received */
- /* when eos is received by sources we cannot ensure that every packets
- * will be received by sinks (maybe queue flush ?)
- */
+ sum_all_packets_received +=
+ g_atomic_int_get (&receive_rtxdata->dropped_requests);
fail_if (sum_all_packets_sent < sum_all_packets_received);
/* some packet are not received, I still have to figure out why
GstElement *bin, *funnel, *rtprtxreceive, *sink;
GstBus *bus;
gboolean res;
- GstPad *srcpad;
+ GstPad *srcpad, *sinkpad;
guint drop_every_n_packets = 0;
GList *send_rtxdata_list = NULL;
- RTXReceiveMultipleData receive_rtxdata;
+ RTXReceiveMultipleData receive_rtxdata = { NULL };
GstStructure *pt_map;
GST_INFO ("preparing test");
g_object_set (sink, "qos", FALSE, NULL);
gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL);
- nb_sources = 0;
- g_mutex_init (&lock_eos);
-
send_rtxdata_list =
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
- "rtpvrawpay", 96, 121));
+ "rtpvrawpay", 96, 121, &receive_rtxdata));
send_rtxdata_list =
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
- "rtpvrawpay", 97, 122));
+ "rtpvrawpay", 97, 122, &receive_rtxdata));
send_rtxdata_list =
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
- "rtpvrawpay", 98, 123));
+ "rtpvrawpay", 98, 123, &receive_rtxdata));
send_rtxdata_list =
g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
- "rtpvrawpay", 99, 124));
+ "rtpvrawpay", 99, 124, &receive_rtxdata));
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122,
&receive_rtxdata, NULL);
gst_object_unref (srcpad);
- main_loop = g_main_loop_new (NULL, FALSE);
+ sinkpad = gst_element_get_static_pad (rtprtxreceive, "sink");
+ gst_pad_add_probe (sinkpad,
+ GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
+ (GstPadProbeCallback) rtprtxreceive_sinkpad_probe_check_drop,
+ &receive_rtxdata, NULL);
+ gst_object_unref (sinkpad);
+
g_signal_connect (bus, "message::error",
- (GCallback) message_received_multiple, bin);
+ (GCallback) message_received_multiple, NULL);
g_signal_connect (bus, "message::warning",
- (GCallback) message_received_multiple, bin);
- g_signal_connect (bus, "message::eos", (GCallback) message_received_multiple,
- bin);
+ (GCallback) message_received_multiple, NULL);
for (drop_every_n_packets = 2; drop_every_n_packets < 10;
drop_every_n_packets++) {
- nb_eos = 0;
start_test_drop_multiple_and_check_results (bin, send_rtxdata_list,
&receive_rtxdata, drop_every_n_packets);
}
/* cleanup */
- g_main_loop_unref (main_loop);
g_list_free_full (send_rtxdata_list, free_rtx_send_data);
g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map);
gst_bus_remove_signal_watch (bus);
gst_object_unref (bus);
gst_object_unref (bin);
-
- g_mutex_clear (&lock_eos);
}
GST_END_TEST;
* to actually retransmit something */
if (j >= MAX (i - half_buffers, 1)) {
guint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
- do
- res = g_cond_wait_until (&check_cond, &check_mutex, end_time);
- while (res == TRUE && last_out_buffer == g_list_last (buffers));
- fail_unless_equals_int (res, TRUE);
+
+ while (last_out_buffer == g_list_last (buffers))
+ fail_unless (g_cond_wait_until (&check_cond, &check_mutex, end_time));
}
g_mutex_unlock (&check_mutex);
}