/**
* SECTION:element-rtprtxqueue
+ * @title: rtprtxqueue
+ *
+ * rtprtxqueue maintains a queue of transmitted RTP packets, up to a
+ * configurable limit (see #GstRTPRtxQueue:max-size-time,
+ * #GstRTPRtxQueue:max-size-packets), and retransmits them upon request
+ * from the downstream rtpsession (GstRTPRetransmissionRequest event).
+ *
+ * This element is similar to rtprtxsend, but it has differences:
+ * - Retransmission from rtprtxqueue is not RFC 4588 compliant. The
+ * retransmitted packets have the same ssrc and payload type as the original
+ * stream.
+ * - As a side-effect of the above, rtprtxqueue does not require the use of
+ * rtprtxreceive on the receiving end. rtpjitterbuffer alone is able to
+ * reconstruct the stream.
+ * - Retransmission from rtprtxqueue happens as soon as the next regular flow
+ * packet is chained, while rtprtxsend retransmits as soon as the retransmission
+ * event is received, using a helper thread.
+ * - rtprtxqueue can be used with rtpbin without the need of hooking to its
+ * #GstRtpBin::request-aux-sender signal, which means it can be used with
+ * rtpbin using gst-launch.
+ *
+ * See also #GstRtpRtxSend, #GstRtpRtxReceive
+ *
+ * # Example pipelines
+ *
+ * |[
+ * gst-launch-1.0 rtpbin name=b rtp-profile=avpf \
+ * audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! rtprtxqueue ! b.send_rtp_sink_0 \
+ * b.send_rtp_src_0 ! identity drop-probability=0.01 ! udpsink host="127.0.0.1" port=5000 \
+ * udpsrc port=5001 ! b.recv_rtcp_sink_0 \
+ * b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5002 sync=false async=false
+ * ]|
+ * Sender pipeline
+ *
+ * |[
+ * gst-launch-1.0 rtpbin name=b rtp-profile=avpf do-retransmission=true \
+ * udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
+ * b.recv_rtp_sink_0 \
+ * b. ! rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
+ * udpsrc port=5002 ! b.recv_rtcp_sink_0 \
+ * b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5001 sync=false async=false
+ * ]|
+ * Receiver pipeline
*/
#ifdef HAVE_CONFIG_H
GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
#define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
+#define DEFAULT_MAX_SIZE_TIME 0
+#define DEFAULT_MAX_SIZE_PACKETS 100
+
enum
{
PROP_0,
- PROP_LAST
+ PROP_MAX_SIZE_TIME,
+ PROP_MAX_SIZE_PACKETS,
+ PROP_REQUESTS,
+ PROP_FULFILLED_REQUESTS,
};
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
+static gboolean gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
element, GstStateChange transition);
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&src_factory));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sink_factory));
+ gobject_class->get_property = gst_rtp_rtx_queue_get_property;
+ gobject_class->set_property = gst_rtp_rtx_queue_set_property;
+ gobject_class->finalize = gst_rtp_rtx_queue_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
+ g_param_spec_uint ("max-size-time", "Max Size Times",
+ "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
+ g_param_spec_uint ("max-size-packets", "Max Size Packets",
+ "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
+ DEFAULT_MAX_SIZE_PACKETS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_REQUESTS,
+ g_param_spec_uint ("requests", "Requests",
+ "Total number of retransmission requests", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_FULFILLED_REQUESTS,
+ g_param_spec_uint ("fulfilled-requests", "Fulfilled Requests",
+ "Number of fulfilled retransmission requests", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
gst_element_class_set_static_metadata (gstelement_class,
"RTP Retransmission Queue", "Codec",
"Keep RTP packets in a queue for retransmission",
"Wim Taymans <wim.taymans@gmail.com>");
- gobject_class->get_property = gst_rtp_rtx_queue_get_property;
- gobject_class->set_property = gst_rtp_rtx_queue_set_property;
- gobject_class->finalize = gst_rtp_rtx_queue_finalize;
-
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
}
g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
g_list_free (rtx->pending);
rtx->pending = NULL;
+ rtx->n_requests = 0;
+ rtx->n_fulfilled_requests = 0;
g_mutex_unlock (&rtx->lock);
}
"sink"), "sink");
GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
+ gst_pad_set_event_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_sink_event));
gst_pad_set_chain_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
+ gst_pad_set_chain_list_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->queue = g_queue_new ();
g_mutex_init (&rtx->lock);
+
+ rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
+ rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
}
typedef struct
if (data->found)
return;
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
+ if (!GST_IS_BUFFER (buffer) ||
+ !gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
return;
seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
data.rtx = rtx;
data.seqnum = seqnum;
data.found = FALSE;
+ rtx->n_requests += 1;
g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
g_mutex_unlock (&rtx->lock);
return res;
}
+static gboolean
+gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
+ gboolean res;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ {
+ g_mutex_lock (&rtx->lock);
+ gst_event_copy_segment (event, &rtx->head_segment);
+ g_queue_push_head (rtx->queue, gst_event_ref (event));
+ g_mutex_unlock (&rtx->lock);
+ /* fall through */
+ }
+ default:
+ res = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ return res;
+}
+
static void
do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
{
+ rtx->n_fulfilled_requests += 1;
gst_pad_push (rtx->srcpad, buffer);
}
+static guint32
+get_ts_diff (GstRTPRtxQueue * rtx)
+{
+ GstClockTime high_ts, low_ts;
+ GstClockTimeDiff result;
+ GstBuffer *high_buf, *low_buf;
+
+ high_buf = g_queue_peek_head (rtx->queue);
+
+ while (GST_IS_EVENT ((low_buf = g_queue_peek_tail (rtx->queue)))) {
+ GstEvent *event = g_queue_pop_tail (rtx->queue);
+ gst_event_copy_segment (event, &rtx->tail_segment);
+ gst_event_unref (event);
+ }
+
+ if (!high_buf || !low_buf || high_buf == low_buf)
+ return 0;
+
+ high_ts = GST_BUFFER_TIMESTAMP (high_buf);
+ low_ts = GST_BUFFER_TIMESTAMP (low_buf);
+
+ high_ts = gst_segment_to_running_time (&rtx->head_segment, GST_FORMAT_TIME,
+ high_ts);
+ low_ts = gst_segment_to_running_time (&rtx->tail_segment, GST_FORMAT_TIME,
+ low_ts);
+
+ result = high_ts - low_ts;
+
+ /* return value in ms instead of ns */
+ return (guint32) gst_util_uint64_scale_int (result, 1, GST_MSECOND);
+}
+
+/* Must be called with rtx->lock */
+static void
+shrink_queue (GstRTPRtxQueue * rtx)
+{
+ if (rtx->max_size_packets) {
+ while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
+ gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+ }
+ if (rtx->max_size_time) {
+ while (get_ts_diff (rtx) > rtx->max_size_time)
+ gst_buffer_unref (g_queue_pop_tail (rtx->queue));
+ }
+}
+
static GstFlowReturn
gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
g_mutex_lock (&rtx->lock);
g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
- while (g_queue_get_length (rtx->queue) > 100) {
- gst_buffer_unref (g_queue_pop_tail (rtx->queue));
- }
+ shrink_queue (rtx);
+
pending = rtx->pending;
rtx->pending = NULL;
g_mutex_unlock (&rtx->lock);
+ pending = g_list_reverse (pending);
g_list_foreach (pending, (GFunc) do_push, rtx);
g_list_free (pending);
return ret;
}
+static gboolean
+push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+ GQueue *queue = user_data;
+
+ g_queue_push_head (queue, gst_buffer_ref (*buffer));
+
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRTPRtxQueue *rtx;
+ GstFlowReturn ret;
+ GList *pending;
+
+ rtx = GST_RTP_RTX_QUEUE (parent);
+
+ g_mutex_lock (&rtx->lock);
+ gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
+ shrink_queue (rtx);
+
+ pending = rtx->pending;
+ rtx->pending = NULL;
+ g_mutex_unlock (&rtx->lock);
+
+ pending = g_list_reverse (pending);
+ g_list_foreach (pending, (GFunc) do_push, rtx);
+ g_list_free (pending);
+
+ ret = gst_pad_push_list (rtx->srcpad, list);
+
+ return ret;
+}
+
static void
gst_rtp_rtx_queue_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
+ GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
+
switch (prop_id) {
+ case PROP_MAX_SIZE_TIME:
+ g_value_set_uint (value, rtx->max_size_time);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ g_value_set_uint (value, rtx->max_size_packets);
+ break;
+ case PROP_REQUESTS:
+ g_value_set_uint (value, rtx->n_requests);
+ break;
+ case PROP_FULFILLED_REQUESTS:
+ g_value_set_uint (value, rtx->n_fulfilled_requests);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
gst_rtp_rtx_queue_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
+ GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
+
switch (prop_id) {
+ case PROP_MAX_SIZE_TIME:
+ rtx->max_size_time = g_value_get_uint (value);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ rtx->max_size_packets = g_value_get_uint (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;