GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
GST_TYPE_RTP_RTX_SEND);
+#define IS_RTX_ENABLED(rtx) (g_hash_table_size ((rtx)->rtx_pt_map) > 0)
+
typedef struct
{
guint16 seqnum;
g_slice_free (SSRCRtxData, data);
}
+typedef enum
+{
+ RTX_TASK_START,
+ RTX_TASK_PAUSE,
+ RTX_TASK_STOP,
+} RtxTaskState;
+
+static void
+gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_set_flushing (rtx->queue, flush);
+ gst_data_queue_flush (rtx->queue);
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rtp_rtx_send_set_task_state (GstRtpRtxSend * rtx, RtxTaskState task_state)
+{
+ GstTask *task = GST_PAD_TASK (rtx->srcpad);
+ GstPadMode mode = GST_PAD_MODE (rtx->srcpad);
+ gboolean ret = TRUE;
+
+ switch (task_state) {
+ case RTX_TASK_START:
+ {
+ gboolean active = task && GST_TASK_STATE (task) == GST_TASK_STARTED;
+ if (IS_RTX_ENABLED (rtx) && mode != GST_PAD_MODE_NONE && !active) {
+ GST_DEBUG_OBJECT (rtx, "Starting RTX task");
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ ret = gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ }
+ break;
+ }
+ case RTX_TASK_PAUSE:
+ if (task) {
+ GST_DEBUG_OBJECT (rtx, "Pausing RTX task");
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_pause_task (rtx->srcpad);
+ }
+ break;
+ case RTX_TASK_STOP:
+ if (task) {
+ GST_DEBUG_OBJECT (rtx, "Stopping RTX task");
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_stop_task (rtx->srcpad);
+ }
+ break;
+ }
+
+ return ret;
+}
+
static void
gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
{
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
}
-static void
-gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
-{
- GST_OBJECT_LOCK (rtx);
- gst_data_queue_set_flushing (rtx->queue, flush);
- gst_data_queue_flush (rtx->queue);
- GST_OBJECT_UNLOCK (rtx);
-}
-
static gboolean
gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
gst_pad_push_event (rtx->srcpad, event);
- gst_rtp_rtx_send_set_flushing (rtx, TRUE);
- gst_pad_pause_task (rtx->srcpad);
+ gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
return TRUE;
case GST_EVENT_FLUSH_STOP:
gst_pad_push_event (rtx->srcpad, event);
- gst_rtp_rtx_send_set_flushing (rtx, FALSE);
- gst_pad_start_task (rtx->srcpad,
- (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
return TRUE;
case GST_EVENT_EOS:
GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
data->destroy (data);
} else {
GST_LOG_OBJECT (rtx, "flushing");
- gst_pad_pause_task (rtx->srcpad);
+ gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
}
}
switch (mode) {
case GST_PAD_MODE_PUSH:
if (active) {
- gst_rtp_rtx_send_set_flushing (rtx, FALSE);
- ret = gst_pad_start_task (rtx->srcpad,
- (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
} else {
- gst_rtp_rtx_send_set_flushing (rtx, TRUE);
- ret = gst_pad_stop_task (rtx->srcpad);
+ ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
}
GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
break;
gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
rtx->rtx_pt_map);
GST_OBJECT_UNLOCK (rtx);
+
+ if (IS_RTX_ENABLED (rtx))
+ gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
+ else
+ gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
+
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (rtx);
return ret;
}
+static GstStructure *
+create_rtx_map (const gchar * name, guint key, guint value)
+{
+ gchar *key_str = g_strdup_printf ("%u", key);
+ GstStructure *s = gst_structure_new (name,
+ key_str, G_TYPE_UINT, value, NULL);
+ g_free (key_str);
+ return s;
+}
+
+GST_START_TEST (test_rtxsend_basic)
+{
+ const guint32 main_ssrc = 1234567;
+ const guint main_pt = 96;
+ const guint32 rtx_ssrc = 7654321;
+ const guint rtx_pt = 106;
+
+ GstHarness *h = gst_harness_new ("rtprtxsend");
+ GstStructure *ssrc_map =
+ create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
+ GstStructure *pt_map =
+ create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
+
+ g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
+ g_object_set (h->element, "payload-type-map", pt_map, NULL);
+
+ gst_harness_set_src_caps_str (h, "application/x-rtp, "
+ "media = (string)video, payload = (int)96, "
+ "ssrc = (uint)1234567, clock-rate = (int)90000, "
+ "encoding-name = (string)RAW");
+
+ /* push a packet */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
+
+ /* and check it came through */
+ pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
+
+ /* now request this packet as rtx */
+ gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
+
+ /* and verify we got an rtx-packet for it */
+ pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 0);
+
+ gst_structure_free (ssrc_map);
+ gst_structure_free (pt_map);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_rtxsend_disabled_enabled_disabled)
+{
+ const guint32 main_ssrc = 1234567;
+ const guint main_pt = 96;
+ const guint32 rtx_ssrc = 7654321;
+ const guint rtx_pt = 106;
+
+ GstHarness *h = gst_harness_new ("rtprtxsend");
+ GstStructure *ssrc_map =
+ create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
+ GstStructure *pt_map =
+ create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
+ GstStructure *empty_pt_map =
+ gst_structure_new_empty ("application/x-rtp-pt-map");
+
+ /* set ssrc-map, but not pt-map, making the element work in passthrough */
+ g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
+
+ gst_harness_set_src_caps_str (h, "application/x-rtp, "
+ "media = (string)video, payload = (int)96, "
+ "ssrc = (uint)1234567, clock-rate = (int)90000, "
+ "encoding-name = (string)RAW");
+
+ /* push, pull, request-rtx, verify nothing arrives */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
+ pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
+ gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
+ fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+ /* verify there is no task on the rtxsend srcpad */
+ fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
+
+ /* now enable rtx by setting the pt-map */
+ g_object_set (h->element, "payload-type-map", pt_map, NULL);
+
+ /* push, pull, request rtx, pull rtx */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 1)));
+ pull_and_verify (h, FALSE, main_ssrc, main_pt, 1);
+ gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 1));
+ pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 1);
+ /* verify there is a task on the rtxsend srcpad */
+ fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) != NULL);
+
+ /* now enable disable rtx agian by setting an empty pt-map */
+ g_object_set (h->element, "payload-type-map", empty_pt_map, NULL);
+
+ /* push, pull, request-rtx, verify nothing arrives */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 2)));
+ pull_and_verify (h, FALSE, main_ssrc, main_pt, 2);
+ gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 2));
+ fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+ /* verify the task is gone again */
+ fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
+
+ gst_structure_free (ssrc_map);
+ gst_structure_free (pt_map);
+ gst_structure_free (empty_pt_map);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_rtxsend_configured_not_playing_cleans_up)
+{
+ GstElement *rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
+ GstStructure *ssrc_map =
+ create_rtx_map ("application/x-rtp-ssrc-map", 123, 96);
+ GstStructure *pt_map = create_rtx_map ("application/x-rtp-pt-map", 321, 106);
+
+ g_object_set (rtxsend, "ssrc-map", ssrc_map, NULL);
+ g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
+ gst_structure_free (ssrc_map);
+ gst_structure_free (pt_map);
+
+ g_usleep (G_USEC_PER_SEC);
+
+ gst_object_unref (rtxsend);
+}
+
+GST_END_TEST;
+
+
GST_START_TEST (test_rtxreceive_empty_rtx_packet)
{
guint rtx_ssrc = 7654321;
suite_add_tcase (s, tc_chain);
+ tcase_add_test (tc_chain, test_rtxsend_basic);
+ tcase_add_test (tc_chain, test_rtxsend_disabled_enabled_disabled);
+ tcase_add_test (tc_chain, test_rtxsend_configured_not_playing_cleans_up);
+
tcase_add_test (tc_chain, test_rtxreceive_empty_rtx_packet);
tcase_add_test (tc_chain, test_rtxsend_rtxreceive);
tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss);