/**
* SECTION:element-rtprtxsend
+ * @title: rtprtxsend
*
* See #GstRtpRtxReceive for examples
- *
+ *
* The purpose of the sender RTX object is to keep a history of RTP packets up
* to a configurable limit (max-size-time or max-size-packets). It will listen
* for upstream custom retransmission events (GstRTPRetransmissionRequest) that
PROP_MAX_SIZE_PACKETS,
PROP_NUM_RTX_REQUESTS,
PROP_NUM_RTX_PACKETS,
- PROP_LAST
+ PROP_CLOCK_RATE_MAP,
};
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
GST_STATIC_CAPS ("application/x-rtp")
);
+static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata);
+
static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
+
+static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
+static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
element, GstStateChange transition);
typedef struct
{
guint32 rtx_ssrc;
- guint16 next_seqnum;
+ guint16 seqnum_base, next_seqnum;
gint clock_rate;
/* history of rtp packets */
SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
data->rtx_ssrc = rtx_ssrc;
- data->next_seqnum = g_random_int_range (0, G_MAXUINT16);
+ data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
return data;
" Number of retransmission packets sent", 0, G_MAXUINT,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&src_factory));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sink_factory));
+ g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
+ g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
+ "Map of payload types to their clock rates",
+ GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
gst_element_class_set_static_metadata (gstelement_class,
"RTP Retransmission Sender", "Codec",
gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
{
GST_OBJECT_LOCK (rtx);
- g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
- g_queue_clear (rtx->pending);
+ gst_data_queue_flush (rtx->queue);
g_hash_table_remove_all (rtx->ssrc_data);
g_hash_table_remove_all (rtx->rtx_ssrcs);
rtx->num_rtx_requests = 0;
g_hash_table_unref (rtx->rtx_pt_map);
if (rtx->rtx_pt_map_structure)
gst_structure_free (rtx->rtx_pt_map_structure);
- g_queue_free_full (rtx->pending, (GDestroyNotify) gst_buffer_unref);
+ g_hash_table_unref (rtx->clock_rate_map);
+ if (rtx->clock_rate_map_structure)
+ gst_structure_free (rtx->clock_rate_map_structure);
+ g_object_unref (rtx->queue);
G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
}
GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
gst_pad_set_event_function (rtx->srcpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
+ gst_pad_set_activatemode_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
rtx->sinkpad =
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
gst_pad_set_chain_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
+ gst_pad_set_chain_list_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
- rtx->pending = g_queue_new ();
+ rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
+ NULL, rtx);
rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify) ssrc_rtx_data_free);
rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+ rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
}
+static void
+gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_set_flushing (rtx->queue, flush);
+ gst_data_queue_flush (rtx->queue);
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata)
+{
+ return FALSE;
+}
+
+static void
+gst_rtp_rtx_data_queue_item_free (gpointer item)
+{
+ GstDataQueueItem *data = item;
+ if (data->object)
+ gst_mini_object_unref (data->object);
+ g_slice_free (GstDataQueueItem, data);
+}
+
+static gboolean
+gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
+{
+ GstDataQueueItem *data;
+ gboolean success;
+
+ data = g_slice_new0 (GstDataQueueItem);
+ data->object = GST_MINI_OBJECT (object);
+ data->size = 1;
+ data->duration = 1;
+ data->visible = TRUE;
+ data->destroy = gst_rtp_rtx_data_queue_item_free;
+
+ success = gst_data_queue_push (rtx->queue, data);
+
+ if (!success)
+ data->destroy (data);
+
+ return success;
+}
+
static guint32
gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
gboolean consider_choice)
gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
{
SSRCRtxData *data;
- guint32 rtx_ssrc;
+ guint32 rtx_ssrc = 0;
gboolean consider = FALSE;
if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
return data;
}
+/* Copy fixed header and extension. Add OSN before to copy payload
+ * Copy memory to avoid to manually copy each rtp buffer field.
+ */
+static GstBuffer *
+gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
+{
+ GstMemory *mem = NULL;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
+ GstBuffer *new_buffer = gst_buffer_new ();
+ GstMapInfo map;
+ guint payload_len = 0;
+ SSRCRtxData *data;
+ guint32 ssrc;
+ guint16 seqnum;
+ guint8 fmtp;
+
+ gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
+
+ /* get needed data from GstRtpRtxSend */
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ ssrc = data->rtx_ssrc;
+ seqnum = data->next_seqnum++;
+ fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
+ GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
+
+ GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
+ "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
+ seqnum, ssrc);
+
+ /* gst_rtp_buffer_map does not map the payload so do it now */
+ gst_rtp_buffer_get_payload (&rtp);
+
+ /* copy fixed header */
+ mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
+ gst_buffer_append_memory (new_buffer, mem);
+
+ /* copy extension if any */
+ if (rtp.size[1]) {
+ mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
+ gst_memory_map (mem, &map, GST_MAP_WRITE);
+ memcpy (map.data, rtp.data[1], rtp.size[1]);
+ gst_memory_unmap (mem, &map);
+ gst_buffer_append_memory (new_buffer, mem);
+ }
+
+ /* copy payload and add OSN just before */
+ payload_len = 2 + rtp.size[2];
+ mem = gst_allocator_alloc (NULL, payload_len, NULL);
+
+ gst_memory_map (mem, &map, GST_MAP_WRITE);
+ GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
+ if (rtp.size[2])
+ memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
+ gst_memory_unmap (mem, &map);
+ gst_buffer_append_memory (new_buffer, mem);
+
+ /* everything needed is copied */
+ gst_rtp_buffer_unmap (&rtp);
+
+ /* set ssrc, seqnum and fmtp */
+ gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
+ gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
+ gst_rtp_buffer_set_seq (&new_rtp, seqnum);
+ gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
+ /* RFC 4588: let other elements do the padding, as normal */
+ gst_rtp_buffer_set_padding (&new_rtp, FALSE);
+ gst_rtp_buffer_unmap (&new_rtp);
+
+ /* Copy over timestamps */
+ gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
+
+ return new_buffer;
+}
+
static gint
buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
gpointer user_data)
if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
guint seqnum = 0;
guint ssrc = 0;
+ GstBuffer *rtx_buf = NULL;
- /* retrieve seqnum of the packet that need to be restransmisted */
+ /* retrieve seqnum of the packet that need to be retransmitted */
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
seqnum = -1;
- /* retrieve ssrc of the packet that need to be restransmisted */
+ /* retrieve ssrc of the packet that need to be retransmitted */
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
- GST_DEBUG_OBJECT (rtx,
- "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+ GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
seqnum, ssrc);
GST_OBJECT_LOCK (rtx);
(GCompareDataFunc) buffer_queue_items_cmp, NULL);
if (iter) {
BufferQueueItem *item = g_sequence_get (iter);
- GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
- g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
+ GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
+ rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
}
+#ifndef GST_DISABLE_DEBUG
+ else {
+ BufferQueueItem *item = NULL;
+
+ iter = g_sequence_get_begin_iter (data->queue);
+ if (!g_sequence_iter_is_end (iter))
+ item = g_sequence_get (iter);
+
+ if (item && seqnum < item->seqnum) {
+ GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
+ "removed from the rtx queue; the first available is %u",
+ seqnum, item->seqnum);
+ } else {
+ GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
+ "transmitted yet in the original stream; either the remote end "
+ "is not configured correctly, or the source is too slow",
+ seqnum);
+ }
+ }
+#endif
}
GST_OBJECT_UNLOCK (rtx);
+ if (rtx_buf)
+ gst_rtp_rtx_send_push_out (rtx, rtx_buf);
+
gst_event_unref (event);
res = TRUE;
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
- GST_DEBUG_OBJECT (rtx, "collision ssrc: %" G_GUINT32_FORMAT, ssrc);
+ GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
GST_OBJECT_LOCK (rtx);
- /* choose another ssrc for our retransmited stream */
+ /* choose another ssrc for our retransmitted stream */
if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
guint master_ssrc;
SSRCRtxData *data;
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ gst_pad_pause_task (rtx->srcpad);
+ return TRUE;
+ case GST_EVENT_FLUSH_STOP:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ return TRUE;
+ case GST_EVENT_EOS:
+ GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
+ gst_rtp_rtx_send_push_out (rtx, event);
+ return TRUE;
case GST_EVENT_CAPS:
{
GstCaps *caps;
GstStructure *s;
guint ssrc;
+ gint payload;
+ gpointer rtx_payload;
SSRCRtxData *data;
gst_event_parse_caps (event, &caps);
- g_assert (gst_caps_is_fixed (caps));
s = gst_caps_get_structure (caps, 0);
- gst_structure_get_uint (s, "ssrc", &ssrc);
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+ if (!gst_structure_get_int (s, "payload", &payload))
+ payload = -1;
+
+ if (payload == -1 || ssrc == G_MAXUINT)
+ break;
+
+ if (payload == -1)
+ GST_WARNING_OBJECT (rtx, "No payload in caps");
+
+ GST_OBJECT_LOCK (rtx);
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
+ GUINT_TO_POINTER (payload), NULL, &rtx_payload))
+ rtx_payload = GINT_TO_POINTER (-1);
+
+ if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
+ GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
+
+ GST_DEBUG_OBJECT (rtx,
+ "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
+ payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
+
gst_structure_get_int (s, "clock-rate", &data->clock_rate);
+ /* The session might need to know the RTX ssrc */
+ caps = gst_caps_copy (caps);
+ gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
+ "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
+
+ if (GPOINTER_TO_INT (rtx_payload) != -1)
+ gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
+ GPOINTER_TO_INT (rtx_payload), NULL);
+
GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
data->clock_rate, ssrc);
+ GST_OBJECT_UNLOCK (rtx);
+ gst_event_unref (event);
+ event = gst_event_new_caps (caps);
+ gst_caps_unref (caps);
break;
}
default:
return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
}
-/* Copy fixed header and extension. Add OSN before to copy payload
- * Copy memory to avoid to manually copy each rtp buffer field.
- */
-static GstBuffer *
-_gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
-{
- GstMemory *mem = NULL;
- GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
- GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
- GstBuffer *new_buffer = gst_buffer_new ();
- GstMapInfo map;
- guint payload_len = 0;
- SSRCRtxData *data;
- guint32 ssrc;
- guint16 seqnum;
- guint8 fmtp;
-
- gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
-
- /* get needed data from GstRtpRtxSend */
- ssrc = gst_rtp_buffer_get_ssrc (&rtp);
- data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
- ssrc = data->rtx_ssrc;
- seqnum = data->next_seqnum++;
- fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
- GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
-
- GST_DEBUG_OBJECT (rtx,
- "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
- seqnum, ssrc);
-
- /* gst_rtp_buffer_map does not map the payload so do it now */
- gst_rtp_buffer_get_payload (&rtp);
-
- /* If payload type is not set through SDP/property then
- * just bump the value */
- if (fmtp < 96)
- fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1;
-
- /* copy fixed header */
- mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
- gst_buffer_append_memory (new_buffer, mem);
-
- /* copy extension if any */
- if (rtp.size[1]) {
- mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
- gst_buffer_append_memory (new_buffer, mem);
- }
-
- /* copy payload and add OSN just before */
- payload_len = 2 + rtp.size[2];
- mem = gst_allocator_alloc (NULL, payload_len, NULL);
-
- gst_memory_map (mem, &map, GST_MAP_WRITE);
- GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
- if (rtp.size[2])
- memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
- gst_memory_unmap (mem, &map);
- gst_buffer_append_memory (new_buffer, mem);
-
- /* everything needed is copied */
- gst_rtp_buffer_unmap (&rtp);
-
- /* set ssrc, seqnum and fmtp */
- gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
- gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
- gst_rtp_buffer_set_seq (&new_rtp, seqnum);
- gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
- /* RFC 4588: let other elements do the padding, as normal */
- gst_rtp_buffer_set_padding (&new_rtp, FALSE);
- gst_rtp_buffer_unmap (&new_rtp);
-
- return new_buffer;
-}
-
-/* push pending retransmission packet.
- * it constructs rtx packet from original packets */
+/* Must be called with lock */
static void
-do_push (GstBuffer * buffer, GstRtpRtxSend * rtx)
+process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
{
- gst_pad_push (rtx->srcpad, _gst_rtp_rtx_buffer_new (rtx, buffer));
-}
-
-static GstFlowReturn
-gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
-{
- GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
- GstFlowReturn ret = GST_FLOW_ERROR;
- GQueue *pending = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
BufferQueueItem *item;
SSRCRtxData *data;
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
- GST_OBJECT_LOCK (rtx);
+ GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
+ ssrc);
/* do not store the buffer if it's payload type is unknown */
if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
+ data->clock_rate =
+ GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
+ GUINT_TO_POINTER (payload_type)));
+ }
+
/* add current rtp buffer to queue history */
item = g_slice_new0 (BufferQueueItem);
item->seqnum = seqnum;
g_sequence_remove (g_sequence_get_begin_iter (data->queue));
}
}
+}
- /* within lock, get packets that have to be retransmited */
- if (g_queue_get_length (rtx->pending) > 0) {
- pending = rtx->pending;
- rtx->pending = g_queue_new ();
+static GstFlowReturn
+gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ GstFlowReturn ret;
- /* update statistics - assume we will succeed to retransmit those packets */
- rtx->num_rtx_packets += g_queue_get_length (pending);
- }
+ GST_OBJECT_LOCK (rtx);
+ process_buffer (rtx, buffer);
+ GST_OBJECT_UNLOCK (rtx);
+ ret = gst_pad_push (rtx->srcpad, buffer);
+
+ return ret;
+}
+
+static gboolean
+process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+ process_buffer (user_data, *buffer);
+ return TRUE;
+}
- /* no need to hold the lock to push rtx packets */
+static GstFlowReturn
+gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ GstFlowReturn ret;
+
+ GST_OBJECT_LOCK (rtx);
+ gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
GST_OBJECT_UNLOCK (rtx);
- /* retransmit requested packets */
- if (pending) {
- g_queue_foreach (pending, (GFunc) do_push, rtx);
- g_queue_free_full (pending, (GDestroyNotify) gst_buffer_unref);
- }
+ ret = gst_pad_push_list (rtx->srcpad, list);
- GST_LOG_OBJECT (rtx,
- "push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum,
- ssrc);
+ return ret;
+}
- /* push current rtp packet */
- ret = gst_pad_push (rtx->srcpad, buffer);
+static void
+gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
+{
+ GstDataQueueItem *data;
+
+ if (gst_data_queue_pop (rtx->queue, &data)) {
+ GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
+
+ if (G_LIKELY (GST_IS_BUFFER (data->object))) {
+ GST_OBJECT_LOCK (rtx);
+ /* Update statistics just before pushing. */
+ rtx->num_rtx_packets++;
+ GST_OBJECT_UNLOCK (rtx);
+
+ gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
+ } else if (GST_IS_EVENT (data->object)) {
+ gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
+
+ /* after EOS, we should not send any more buffers,
+ * even if there are more requests coming in */
+ if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ }
+ } else {
+ g_assert_not_reached ();
+ }
+
+ data->object = NULL; /* we no longer own that object */
+ data->destroy (data);
+ } else {
+ GST_LOG_OBJECT (rtx, "flushing");
+ gst_pad_pause_task (rtx->srcpad);
+ }
+}
+static gboolean
+gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ gboolean ret = FALSE;
+
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ if (active) {
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ ret = gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ } else {
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_stop_task (rtx->srcpad);
+ }
+ GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
+ break;
+ default:
+ break;
+ }
return ret;
}
g_value_set_uint (value, rtx->num_rtx_packets);
GST_OBJECT_UNLOCK (rtx);
break;
+ case PROP_CLOCK_RATE_MAP:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_boxed (value, rtx->clock_rate_map_structure);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
rtx->max_size_packets = g_value_get_uint (value);
GST_OBJECT_UNLOCK (rtx);
break;
+ case PROP_CLOCK_RATE_MAP:
+ GST_OBJECT_LOCK (rtx);
+ if (rtx->clock_rate_map_structure)
+ gst_structure_free (rtx->clock_rate_map_structure);
+ rtx->clock_rate_map_structure = g_value_dup_boxed (value);
+ g_hash_table_remove_all (rtx->clock_rate_map);
+ gst_structure_foreach (rtx->clock_rate_map_structure,
+ structure_to_hash_table, rtx->clock_rate_map);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;