927fb1cf06043272a3ebf858f8d4934297446e48
[platform/upstream/gstreamer.git] / gst / rist / gstristrtxsend.c
1 /* RIST Retransmission sender element for GStreamer
2  *
3  * gsristprtxsend.c:
4  *
5  * Copyright (C) 2013-2019 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *           Nicoas Dufresne <nicolas.dufresne@collabora.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-ristrtxsend
27  * @title: ristrtxsend
28  * @see_also: ristrtxreceive
29  *
30  * This elements replies to custom events 'GstRTPRetransmissionRequest' and
31  * when available sends in RIST form the lost packet. This element is intented
32  * to be used by ristsink element.
33  */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include <gst/gst.h>
40 #include <gst/rtp/gstrtpbuffer.h>
41 #include <gst/base/gstdataqueue.h>
42
43 #include "gstrist.h"
44
45 GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_send_debug);
46 #define GST_CAT_DEFAULT gst_rist_rtx_send_debug
47
48 #define DEFAULT_MAX_SIZE_TIME    0
49 #define DEFAULT_MAX_SIZE_PACKETS 100
50
51 enum
52 {
53   PROP_0,
54   PROP_MAX_SIZE_TIME,
55   PROP_MAX_SIZE_PACKETS,
56   PROP_NUM_RTX_REQUESTS,
57   PROP_NUM_RTX_PACKETS,
58 };
59
60 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
61     GST_PAD_SRC,
62     GST_PAD_ALWAYS,
63     GST_STATIC_CAPS ("application/x-rtp")
64     );
65
66 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
70     );
71
72 struct _GstRistRtxSend
73 {
74   GstElement element;
75
76   /* pad */
77   GstPad *sinkpad;
78   GstPad *srcpad;
79
80   /* rtp packets that will be pushed out */
81   GstDataQueue *queue;
82
83   /* ssrc -> SSRCRtxData */
84   GHashTable *ssrc_data;
85   /* rtx ssrc -> master ssrc */
86   GHashTable *rtx_ssrcs;
87
88   /* buffering control properties */
89   guint max_size_time;
90   guint max_size_packets;
91
92   /* statistics */
93   guint num_rtx_requests;
94   guint num_rtx_packets;
95 };
96
97 static gboolean gst_rist_rtx_send_queue_check_full (GstDataQueue * queue,
98     guint visible, guint bytes, guint64 time, gpointer checkdata);
99
100 static gboolean gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent,
101     GstEvent * event);
102 static gboolean gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent,
103     GstEvent * event);
104 static GstFlowReturn gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent,
105     GstBuffer * buffer);
106 static GstFlowReturn gst_rist_rtx_send_chain_list (GstPad * pad,
107     GstObject * parent, GstBufferList * list);
108
109 static void gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx);
110 static gboolean gst_rist_rtx_send_activate_mode (GstPad * pad,
111     GstObject * parent, GstPadMode mode, gboolean active);
112
113 static GstStateChangeReturn gst_rist_rtx_send_change_state (GstElement *
114     element, GstStateChange transition);
115
116 static void gst_rist_rtx_send_set_property (GObject * object, guint prop_id,
117     const GValue * value, GParamSpec * pspec);
118 static void gst_rist_rtx_send_get_property (GObject * object, guint prop_id,
119     GValue * value, GParamSpec * pspec);
120 static void gst_rist_rtx_send_finalize (GObject * object);
121
122 G_DEFINE_TYPE_WITH_CODE (GstRistRtxSend, gst_rist_rtx_send, GST_TYPE_ELEMENT,
123     GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_send_debug, "ristrtxsend", 0,
124         "RIST retransmission sender"));
125
126 typedef struct
127 {
128   guint16 seqnum;
129   guint32 timestamp;
130   GstBuffer *buffer;
131 } BufferQueueItem;
132
133 static void
134 buffer_queue_item_free (BufferQueueItem * item)
135 {
136   gst_buffer_unref (item->buffer);
137   g_slice_free (BufferQueueItem, item);
138 }
139
140 typedef struct
141 {
142   guint32 rtx_ssrc;
143   guint16 seqnum_base, next_seqnum;
144   gint clock_rate;
145
146   /* history of rtp packets */
147   GSequence *queue;
148 } SSRCRtxData;
149
150 static SSRCRtxData *
151 ssrc_rtx_data_new (guint32 rtx_ssrc)
152 {
153   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
154
155   data->rtx_ssrc = rtx_ssrc;
156   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
157   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
158
159   return data;
160 }
161
162 static void
163 ssrc_rtx_data_free (SSRCRtxData * data)
164 {
165   g_sequence_free (data->queue);
166   g_slice_free (SSRCRtxData, data);
167 }
168
169 static void
170 gst_rist_rtx_send_class_init (GstRistRtxSendClass * klass)
171 {
172   GObjectClass *gobject_class;
173   GstElementClass *gstelement_class;
174
175   gobject_class = (GObjectClass *) klass;
176   gstelement_class = (GstElementClass *) klass;
177
178   gobject_class->get_property = gst_rist_rtx_send_get_property;
179   gobject_class->set_property = gst_rist_rtx_send_set_property;
180   gobject_class->finalize = gst_rist_rtx_send_finalize;
181
182   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
183       g_param_spec_uint ("max-size-time", "Max Size Time",
184           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
185           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186
187   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
188       g_param_spec_uint ("max-size-packets", "Max Size Packets",
189           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
190           DEFAULT_MAX_SIZE_PACKETS,
191           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192
193   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
194       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
195           "Number of retransmission events received", 0, G_MAXUINT,
196           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
197
198   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
199       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
200           " Number of retransmission packets sent", 0, G_MAXUINT,
201           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
202
203   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
204   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
205
206   gst_element_class_set_static_metadata (gstelement_class,
207       "RIST Retransmission Sender", "Codec",
208       "Retransmit RTP packets when needed, according to VSF TR-06-1",
209       "Nicolas Dufresne <nicolas.dufresne@collabora.com>");
210
211   gstelement_class->change_state =
212       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_change_state);
213 }
214
215 static void
216 gst_rist_rtx_send_reset (GstRistRtxSend * rtx)
217 {
218   GST_OBJECT_LOCK (rtx);
219   gst_data_queue_flush (rtx->queue);
220   g_hash_table_remove_all (rtx->ssrc_data);
221   g_hash_table_remove_all (rtx->rtx_ssrcs);
222   rtx->num_rtx_requests = 0;
223   rtx->num_rtx_packets = 0;
224   GST_OBJECT_UNLOCK (rtx);
225 }
226
227 static void
228 gst_rist_rtx_send_finalize (GObject * object)
229 {
230   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
231
232   g_hash_table_unref (rtx->ssrc_data);
233   g_hash_table_unref (rtx->rtx_ssrcs);
234   g_object_unref (rtx->queue);
235
236   G_OBJECT_CLASS (gst_rist_rtx_send_parent_class)->finalize (object);
237 }
238
239 static void
240 gst_rist_rtx_send_init (GstRistRtxSend * rtx)
241 {
242   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
243
244   rtx->srcpad =
245       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
246           "src"), "src");
247   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
248   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
249   gst_pad_set_event_function (rtx->srcpad,
250       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_src_event));
251   gst_pad_set_activatemode_function (rtx->srcpad,
252       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_activate_mode));
253   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
254
255   rtx->sinkpad =
256       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
257           "sink"), "sink");
258   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
259   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
260   gst_pad_set_event_function (rtx->sinkpad,
261       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_sink_event));
262   gst_pad_set_chain_function (rtx->sinkpad,
263       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain));
264   gst_pad_set_chain_list_function (rtx->sinkpad,
265       GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain_list));
266   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
267
268   rtx->queue = gst_data_queue_new (gst_rist_rtx_send_queue_check_full, NULL,
269       NULL, rtx);
270   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
271       NULL, (GDestroyNotify) ssrc_rtx_data_free);
272   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
273
274   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
275   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
276 }
277
278 static void
279 gst_rist_rtx_send_set_flushing (GstRistRtxSend * rtx, gboolean flush)
280 {
281   GST_OBJECT_LOCK (rtx);
282   gst_data_queue_set_flushing (rtx->queue, flush);
283   gst_data_queue_flush (rtx->queue);
284   GST_OBJECT_UNLOCK (rtx);
285 }
286
287 static gboolean
288 gst_rist_rtx_send_queue_check_full (GstDataQueue * queue,
289     guint visible, guint bytes, guint64 time, gpointer checkdata)
290 {
291   return FALSE;
292 }
293
294 static void
295 gst_rtp_rtx_data_queue_item_free (gpointer item)
296 {
297   GstDataQueueItem *data = item;
298   if (data->object)
299     gst_mini_object_unref (data->object);
300   g_slice_free (GstDataQueueItem, data);
301 }
302
303 static gboolean
304 gst_rist_rtx_send_push_out (GstRistRtxSend * rtx, gpointer object)
305 {
306   GstDataQueueItem *data;
307   gboolean success;
308
309   data = g_slice_new0 (GstDataQueueItem);
310   data->object = GST_MINI_OBJECT (object);
311   data->size = 1;
312   data->duration = 1;
313   data->visible = TRUE;
314   data->destroy = gst_rtp_rtx_data_queue_item_free;
315
316   success = gst_data_queue_push (rtx->queue, data);
317
318   if (!success)
319     data->destroy (data);
320
321   return success;
322 }
323
324 static SSRCRtxData *
325 gst_rist_rtx_send_get_ssrc_data (GstRistRtxSend * rtx, guint32 ssrc)
326 {
327   SSRCRtxData *data;
328   guint32 rtx_ssrc = 0;
329
330   data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
331   if (!data) {
332     /* See 5.3.2 Retransmitted Packets, orignal packet have SSRC LSB set to
333      * 0, while RTX packet have LSB set to 1 */
334     rtx_ssrc = ssrc + 1;
335     data = ssrc_rtx_data_new (rtx_ssrc);
336     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
337     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
338         GUINT_TO_POINTER (ssrc));
339   }
340
341   return data;
342 }
343
344 /*
345  * see RIST TR-06-1 5.3.2 Retransmitted Packets
346  *
347  * RIST simply resend the packet verbatim, with SSRC+1, the defaults SSRC always
348  * have the LSB set to 0, so we can differentiate the retransmission and the
349  * normal packet.
350  */
351 static GstBuffer *
352 gst_rtp_rist_buffer_new (GstRistRtxSend * rtx, GstBuffer * buffer, guint32 ssrc)
353 {
354   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
355
356   buffer = gst_buffer_copy_deep (buffer);
357   gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp);
358   gst_rtp_buffer_set_ssrc (&rtp, ssrc + 1);
359   gst_rtp_buffer_unmap (&rtp);
360
361   return buffer;
362 }
363
364 static gint
365 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
366     gpointer user_data)
367 {
368   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
369    * it returns negative when seqnum1 > seqnum2 and we want negative
370    * when b > a, i.e. a is smaller, so it comes first in the sequence */
371   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
372 }
373
374 static gboolean
375 gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
376 {
377   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
378
379   switch (GST_EVENT_TYPE (event)) {
380     case GST_EVENT_CUSTOM_UPSTREAM:
381     {
382       const GstStructure *s = gst_event_get_structure (event);
383
384       /* This event usually comes from the downstream gstrtpsession */
385       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
386         guint seqnum = 0;
387         guint ssrc = 0;
388         GstBuffer *rtx_buf = NULL;
389
390         /* retrieve seqnum of the packet that need to be retransmitted */
391         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
392           seqnum = -1;
393
394         /* retrieve ssrc of the packet that need to be retransmitted */
395         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
396           ssrc = -1;
397
398         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
399             seqnum, ssrc);
400
401         GST_OBJECT_LOCK (rtx);
402         /* check if request is for us */
403         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
404           SSRCRtxData *data;
405           GSequenceIter *iter;
406           BufferQueueItem search_item;
407
408           /* update statistics */
409           ++rtx->num_rtx_requests;
410
411           data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
412
413           search_item.seqnum = seqnum;
414           iter = g_sequence_lookup (data->queue, &search_item,
415               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
416           if (iter) {
417             BufferQueueItem *item = g_sequence_get (iter);
418             GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
419             rtx_buf = gst_rtp_rist_buffer_new (rtx, item->buffer, ssrc);
420           }
421 #ifndef GST_DISABLE_DEBUG
422           else {
423             BufferQueueItem *item = NULL;
424
425             iter = g_sequence_get_begin_iter (data->queue);
426             if (!g_sequence_iter_is_end (iter))
427               item = g_sequence_get (iter);
428
429             if (item && seqnum < item->seqnum) {
430               GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
431                   "removed from the rtx queue; the first available is %u",
432                   seqnum, item->seqnum);
433             } else {
434               GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
435                   "transmitted yet in the original stream; either the remote end "
436                   "is not configured correctly, or the source is too slow",
437                   seqnum);
438             }
439 #endif
440           }
441         }
442         GST_OBJECT_UNLOCK (rtx);
443
444         if (rtx_buf)
445           gst_rist_rtx_send_push_out (rtx, rtx_buf);
446
447         gst_event_unref (event);
448         return TRUE;
449       }
450       break;
451     }
452     default:
453       break;
454   }
455
456   return gst_pad_event_default (pad, parent, event);
457 }
458
459 static gboolean
460 gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent,
461     GstEvent * event)
462 {
463   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
464
465   switch (GST_EVENT_TYPE (event)) {
466     case GST_EVENT_FLUSH_START:
467       gst_pad_push_event (rtx->srcpad, event);
468       gst_rist_rtx_send_set_flushing (rtx, TRUE);
469       gst_pad_pause_task (rtx->srcpad);
470       return TRUE;
471     case GST_EVENT_FLUSH_STOP:
472       gst_pad_push_event (rtx->srcpad, event);
473       gst_rist_rtx_send_set_flushing (rtx, FALSE);
474       gst_pad_start_task (rtx->srcpad,
475           (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL);
476       return TRUE;
477     case GST_EVENT_EOS:
478       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
479       gst_rist_rtx_send_push_out (rtx, event);
480       return TRUE;
481     case GST_EVENT_CAPS:
482     {
483       GstCaps *caps;
484       GstStructure *s;
485       guint ssrc;
486       gint payload;
487       SSRCRtxData *data;
488
489       gst_event_parse_caps (event, &caps);
490
491       s = gst_caps_get_structure (caps, 0);
492       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
493         ssrc = -1;
494       if (!gst_structure_get_int (s, "payload", &payload))
495         payload = -1;
496
497       if (payload == -1)
498         GST_WARNING_OBJECT (rtx, "No payload in caps");
499
500       GST_OBJECT_LOCK (rtx);
501       data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
502
503       GST_DEBUG_OBJECT (rtx,
504           "got caps for payload: %d->%d, ssrc: %u : %" GST_PTR_FORMAT,
505           payload, ssrc, data->rtx_ssrc, caps);
506
507       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
508
509       /* The session might need to know the RTX ssrc */
510       caps = gst_caps_copy (caps);
511       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
512           "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
513
514       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
515           data->clock_rate, ssrc);
516       GST_OBJECT_UNLOCK (rtx);
517
518       gst_event_unref (event);
519       event = gst_event_new_caps (caps);
520       gst_caps_unref (caps);
521       break;
522     }
523     default:
524       break;
525   }
526
527   return gst_pad_event_default (pad, parent, event);
528 }
529
530 /* like rtp_jitter_buffer_get_ts_diff() */
531 static guint32
532 gst_rist_rtx_send_get_ts_diff (SSRCRtxData * data)
533 {
534   guint64 high_ts, low_ts;
535   BufferQueueItem *high_buf, *low_buf;
536   guint32 result;
537
538   high_buf =
539       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
540           (data->queue)));
541   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
542
543   if (!high_buf || !low_buf || high_buf == low_buf)
544     return 0;
545
546   high_ts = high_buf->timestamp;
547   low_ts = low_buf->timestamp;
548
549   /* it needs to work if ts wraps */
550   if (high_ts >= low_ts) {
551     result = (guint32) (high_ts - low_ts);
552   } else {
553     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
554   }
555
556   /* return value in ms instead of clock ticks */
557   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
558 }
559
560 /* Must be called with lock */
561 static void
562 process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer)
563 {
564   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
565   BufferQueueItem *item;
566   SSRCRtxData *data;
567   guint16 seqnum;
568   guint32 ssrc, rtptime;
569
570   /* read the information we want from the buffer */
571   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
572   seqnum = gst_rtp_buffer_get_seq (&rtp);
573   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
574   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
575   gst_rtp_buffer_unmap (&rtp);
576
577   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
578       ssrc);
579
580   data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
581
582   /* add current rtp buffer to queue history */
583   item = g_slice_new0 (BufferQueueItem);
584   item->seqnum = seqnum;
585   item->timestamp = rtptime;
586   item->buffer = gst_buffer_ref (buffer);
587   g_sequence_append (data->queue, item);
588
589   /* remove oldest packets from history if they are too many */
590   if (rtx->max_size_packets) {
591     while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
592       g_sequence_remove (g_sequence_get_begin_iter (data->queue));
593   }
594   if (rtx->max_size_time) {
595     while (gst_rist_rtx_send_get_ts_diff (data) > rtx->max_size_time)
596       g_sequence_remove (g_sequence_get_begin_iter (data->queue));
597   }
598 }
599
600 static GstFlowReturn
601 gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
602 {
603   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
604   GstFlowReturn ret;
605
606   GST_OBJECT_LOCK (rtx);
607   process_buffer (rtx, buffer);
608   GST_OBJECT_UNLOCK (rtx);
609   ret = gst_pad_push (rtx->srcpad, buffer);
610
611   return ret;
612 }
613
614 static gboolean
615 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
616 {
617   process_buffer (user_data, *buffer);
618   return TRUE;
619 }
620
621 static GstFlowReturn
622 gst_rist_rtx_send_chain_list (GstPad * pad, GstObject * parent,
623     GstBufferList * list)
624 {
625   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
626   GstFlowReturn ret;
627
628   GST_OBJECT_LOCK (rtx);
629   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
630   GST_OBJECT_UNLOCK (rtx);
631
632   ret = gst_pad_push_list (rtx->srcpad, list);
633
634   return ret;
635 }
636
637 static void
638 gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx)
639 {
640   GstDataQueueItem *data;
641
642   if (gst_data_queue_pop (rtx->queue, &data)) {
643     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
644
645     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
646       GST_OBJECT_LOCK (rtx);
647       /* Update statistics just before pushing. */
648       rtx->num_rtx_packets++;
649       GST_OBJECT_UNLOCK (rtx);
650
651       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
652     } else if (GST_IS_EVENT (data->object)) {
653       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
654
655       /* after EOS, we should not send any more buffers,
656        * even if there are more requests coming in */
657       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
658         gst_rist_rtx_send_set_flushing (rtx, TRUE);
659       }
660     } else {
661       g_assert_not_reached ();
662     }
663
664     data->object = NULL;        /* we no longer own that object */
665     data->destroy (data);
666   } else {
667     GST_LOG_OBJECT (rtx, "flushing");
668     gst_pad_pause_task (rtx->srcpad);
669   }
670 }
671
672 static gboolean
673 gst_rist_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
674     GstPadMode mode, gboolean active)
675 {
676   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
677   gboolean ret = FALSE;
678
679   switch (mode) {
680     case GST_PAD_MODE_PUSH:
681       if (active) {
682         gst_rist_rtx_send_set_flushing (rtx, FALSE);
683         ret = gst_pad_start_task (rtx->srcpad,
684             (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL);
685       } else {
686         gst_rist_rtx_send_set_flushing (rtx, TRUE);
687         ret = gst_pad_stop_task (rtx->srcpad);
688       }
689       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
690       break;
691     default:
692       break;
693   }
694   return ret;
695 }
696
697 static void
698 gst_rist_rtx_send_get_property (GObject * object,
699     guint prop_id, GValue * value, GParamSpec * pspec)
700 {
701   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
702
703   switch (prop_id) {
704     case PROP_MAX_SIZE_TIME:
705       GST_OBJECT_LOCK (rtx);
706       g_value_set_uint (value, rtx->max_size_time);
707       GST_OBJECT_UNLOCK (rtx);
708       break;
709     case PROP_MAX_SIZE_PACKETS:
710       GST_OBJECT_LOCK (rtx);
711       g_value_set_uint (value, rtx->max_size_packets);
712       GST_OBJECT_UNLOCK (rtx);
713       break;
714     case PROP_NUM_RTX_REQUESTS:
715       GST_OBJECT_LOCK (rtx);
716       g_value_set_uint (value, rtx->num_rtx_requests);
717       GST_OBJECT_UNLOCK (rtx);
718       break;
719     case PROP_NUM_RTX_PACKETS:
720       GST_OBJECT_LOCK (rtx);
721       g_value_set_uint (value, rtx->num_rtx_packets);
722       GST_OBJECT_UNLOCK (rtx);
723       break;
724     default:
725       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
726       break;
727   }
728 }
729
730 static void
731 gst_rist_rtx_send_set_property (GObject * object,
732     guint prop_id, const GValue * value, GParamSpec * pspec)
733 {
734   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
735
736   switch (prop_id) {
737     case PROP_MAX_SIZE_TIME:
738       GST_OBJECT_LOCK (rtx);
739       rtx->max_size_time = g_value_get_uint (value);
740       GST_OBJECT_UNLOCK (rtx);
741       break;
742     case PROP_MAX_SIZE_PACKETS:
743       GST_OBJECT_LOCK (rtx);
744       rtx->max_size_packets = g_value_get_uint (value);
745       GST_OBJECT_UNLOCK (rtx);
746       break;
747     default:
748       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
749       break;
750   }
751 }
752
753 static GstStateChangeReturn
754 gst_rist_rtx_send_change_state (GstElement * element, GstStateChange transition)
755 {
756   GstStateChangeReturn ret;
757   GstRistRtxSend *rtx = GST_RIST_RTX_SEND (element);
758
759   ret =
760       GST_ELEMENT_CLASS (gst_rist_rtx_send_parent_class)->change_state (element,
761       transition);
762
763   switch (transition) {
764     case GST_STATE_CHANGE_PAUSED_TO_READY:
765       gst_rist_rtx_send_reset (rtx);
766       break;
767     default:
768       break;
769   }
770
771   return ret;
772 }