G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
+typedef struct
+{
+ guint16 seqnum;
+ guint32 timestamp;
+ GstBuffer *buffer;
+} BufferQueueItem;
+
+static void
+buffer_queue_item_free (BufferQueueItem * item)
+{
+ gst_buffer_unref (item->buffer);
+ g_free (item);
+}
+
static void
gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
{
gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx, gboolean full)
{
g_mutex_lock (&rtx->lock);
- g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
+ g_queue_foreach (rtx->queue, (GFunc) buffer_queue_item_free, NULL);
g_queue_clear (rtx->queue);
g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
g_list_free (rtx->pending);
/* traverse queue history and try to find the buffer that the
* requested seqnum */
static void
-push_seqnum (GstBuffer * buffer, RTXData * data)
+push_seqnum (BufferQueueItem * item, RTXData * data)
{
GstRtpRtxSend *rtx = data->rtx;
- GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
- guint16 seqnum;
if (data->found)
return;
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
- return;
-
- seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
- gst_rtp_buffer_unmap (&rtpbuffer);
-
/* data->seqnum comes from the request */
- if (seqnum == data->seqnum) {
+ if (item->seqnum == data->seqnum) {
data->found = TRUE;
- GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, seqnum);
- rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
+ GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
+ rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (item->buffer));
}
}
GstFlowReturn ret = GST_FLOW_ERROR;
GList *pending = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
- guint seqnum = 0;
+ BufferQueueItem *item;
+ guint16 seqnum;
+ guint32 ssrc, rtptime;
- g_mutex_lock (&rtx->lock);
+ rtx = GST_RTP_RTX_SEND (parent);
- /* retrievemaster stream ssrc */
+ /* read the information we want from the buffer */
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
- rtx->master_ssrc = gst_rtp_buffer_get_ssrc (&rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ rtptime = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
+ g_mutex_lock (&rtx->lock);
+
+ /* retrieve master stream ssrc */
+ rtx->master_ssrc = ssrc;
/* check if our initial aux ssrc is equal to master */
if (rtx->rtx_ssrc == rtx->master_ssrc)
choose_ssrc (rtx);
/* add current rtp buffer to queue history */
- g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
+ item = g_new0 (BufferQueueItem, 1);
+ item->seqnum = seqnum;
+ item->timestamp = rtptime;
+ item->buffer = gst_buffer_ref (buffer);
+ g_queue_push_head (rtx->queue, item);
/* remove oldest packets from history if they are too many */
if (rtx->max_size_packets) {
while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
- gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+ buffer_queue_item_free (g_queue_pop_tail (rtx->queue));
}
/* within lock, get packets that have to be retransmited */
pending = rtx->pending;
rtx->pending = NULL;
- /* assume we will succeed to retransmit those packets */
+ /* update statistics - assume we will succeed to retransmit those packets */
rtx->num_rtx_packets += g_list_length (pending);
/* transfer payload type while holding the lock */
rtx->rtx_payload_type = rtx->rtx_payload_type_pending;
+ /* no need to hold the lock to push rtx packets */
g_mutex_unlock (&rtx->lock);
- /* no need to hold the lock to push rtx packets */
+ /* retransmit requested packets */
g_list_foreach (pending, (GFunc) do_push, rtx);
g_list_foreach (pending, (GFunc) gst_buffer_unref, NULL);
g_list_free (pending);