GST_STATIC_CAPS ("application/x-rtp")
);
+static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata);
+
static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
+static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
+
static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
element, GstStateChange transition);
gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
{
GST_OBJECT_LOCK (rtx);
- g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
- g_queue_clear (rtx->pending);
+ gst_data_queue_flush (rtx->queue);
g_hash_table_remove_all (rtx->ssrc_data);
g_hash_table_remove_all (rtx->rtx_ssrcs);
rtx->num_rtx_requests = 0;
g_hash_table_unref (rtx->rtx_pt_map);
if (rtx->rtx_pt_map_structure)
gst_structure_free (rtx->rtx_pt_map_structure);
- g_queue_free_full (rtx->pending, (GDestroyNotify) gst_buffer_unref);
+ g_object_unref (rtx->queue);
G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
}
GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
gst_pad_set_event_function (rtx->srcpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
+ gst_pad_set_activatemode_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
rtx->sinkpad =
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
- rtx->pending = g_queue_new ();
+ rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
+ NULL, rtx);
rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify) ssrc_rtx_data_free);
rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
}
+static void
+gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_set_flushing (rtx->queue, flush);
+ gst_data_queue_flush (rtx->queue);
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata)
+{
+ return FALSE;
+}
+
+static void
+gst_rtp_rtx_data_queue_item_free (gpointer item)
+{
+ GstDataQueueItem *data = item;
+ if (data->object)
+ gst_mini_object_unref (data->object);
+ g_slice_free (GstDataQueueItem, data);
+}
+
+static gboolean
+gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
+{
+ GstDataQueueItem *data;
+ gboolean success;
+
+ data = g_slice_new0 (GstDataQueueItem);
+ data->object = GST_MINI_OBJECT (object);
+ data->size = 1;
+ data->duration = 1;
+ data->visible = TRUE;
+ data->destroy = gst_rtp_rtx_data_queue_item_free;
+
+ success = gst_data_queue_push (rtx->queue, data);
+
+ if (!success)
+ data->destroy (data);
+
+ return success;
+}
+
static guint32
gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
gboolean consider_choice)
if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
guint seqnum = 0;
guint ssrc = 0;
+ GstBuffer *rtx_buf = NULL;
/* retrieve seqnum of the packet that need to be restransmisted */
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
if (iter) {
BufferQueueItem *item = g_sequence_get (iter);
GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
- g_queue_push_tail (rtx->pending,
- gst_rtp_rtx_buffer_new (rtx, item->buffer));
+ rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
}
}
GST_OBJECT_UNLOCK (rtx);
+ if (rtx_buf)
+ gst_rtp_rtx_send_push_out (rtx, rtx_buf);
+
gst_event_unref (event);
res = TRUE;
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ gst_pad_pause_task (rtx->srcpad);
+ return TRUE;
+ case GST_EVENT_FLUSH_STOP:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ return TRUE;
case GST_EVENT_CAPS:
{
GstCaps *caps;
return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
}
-/* push pending retransmission packet.
- * it constructs rtx packet from original packets */
-static void
-do_push (GstBuffer * buffer, GstRtpRtxSend * rtx)
-{
- gst_pad_push (rtx->srcpad, buffer);
-}
-
static GstFlowReturn
gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
GstFlowReturn ret = GST_FLOW_ERROR;
- GQueue *pending = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
BufferQueueItem *item;
SSRCRtxData *data;
}
}
- /* within lock, get packets that have to be retransmited */
- if (g_queue_get_length (rtx->pending) > 0) {
- pending = rtx->pending;
- rtx->pending = g_queue_new ();
-
- /* update statistics - assume we will succeed to retransmit those packets */
- rtx->num_rtx_packets += g_queue_get_length (pending);
- }
-
- /* no need to hold the lock to push rtx packets */
GST_OBJECT_UNLOCK (rtx);
- /* retransmit requested packets */
- if (pending) {
- g_queue_foreach (pending, (GFunc) do_push, rtx);
- g_queue_free (pending);
- }
-
GST_LOG_OBJECT (rtx,
"push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum,
ssrc);
- /* push current rtp packet */
ret = gst_pad_push (rtx->srcpad, buffer);
return ret;
}
static void
+gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
+{
+ GstDataQueueItem *data;
+
+ if (gst_data_queue_pop (rtx->queue, &data)) {
+ GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
+
+ gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
+
+ GST_OBJECT_LOCK (rtx);
+ rtx->num_rtx_packets++;
+ GST_OBJECT_UNLOCK (rtx);
+
+ data->object = NULL; /* we no longer own that object */
+ data->destroy (data);
+ } else {
+ GST_LOG_OBJECT (rtx, "flushing");
+ gst_pad_pause_task (rtx->srcpad);
+ }
+}
+
+static gboolean
+gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ gboolean ret = FALSE;
+
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ if (active) {
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ ret = gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ } else {
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_stop_task (rtx->srcpad);
+ }
+ GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
+ break;
+ default:
+ break;
+ }
+ return ret;
+}
+
+static void
gst_rtp_rtx_send_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{