From: Wim Taymans Date: Tue, 10 Dec 2013 10:57:37 +0000 (+0100) Subject: rtpjitterbuffer: serialize events in the buffer X-Git-Tag: 1.3.1~503 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=eee515cb2c1367b4bd60f3e19251f42c3edd6ffe;hp=36e78bc5caed49950d0cc6b320b9992cbc1c62eb;p=platform%2Fupstream%2Fgst-plugins-good.git rtpjitterbuffer: serialize events in the buffer Serialize events into the jitterbuffer by inserting them with a -1 seqnum. Update unit test to expect events from the streaming thread. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=652986 --- diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 1500120..464bb09 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -730,6 +730,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) #define ITEM_TYPE_BUFFER 0 #define ITEM_TYPE_LOST 1 +#define ITEM_TYPE_EVENT 2 static RTPJitterBufferItem * alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, @@ -1303,18 +1304,13 @@ gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent, return ret; } +/* handles and stores the event in the jitterbuffer, must be called with + * LOCK */ static gboolean -gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, - GstEvent * event) +queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) { - gboolean ret = TRUE; - GstRtpJitterBuffer *jitterbuffer; - GstRtpJitterBufferPrivate *priv; - - jitterbuffer = GST_RTP_JITTER_BUFFER (parent); - priv = jitterbuffer->priv; - - GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + RTPJitterBufferItem *item; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_CAPS: @@ -1322,20 +1318,12 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, GstCaps *caps; gst_event_parse_caps (event, &caps); + if (!gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps)) + goto wrong_caps; - JBUF_LOCK (priv); - ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps); - JBUF_UNLOCK (priv); - - /* set same caps on srcpad on success */ - if (ret) - ret = gst_pad_push_event (priv->srcpad, event); - else - gst_event_unref (event); break; } case GST_EVENT_SEGMENT: - { gst_event_copy_segment (event, &priv->segment); /* we need time for now */ @@ -1344,11 +1332,51 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "newsegment: %" GST_SEGMENT_FORMAT, &priv->segment); - - /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */ - ret = gst_pad_push_event (priv->srcpad, event); break; - } + case GST_EVENT_EOS: + priv->eos = TRUE; + break; + default: + break; + } + + + GST_DEBUG_OBJECT (jitterbuffer, "adding event"); + item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); + rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL); + JBUF_SIGNAL_EVENT (priv); + + return TRUE; + + /* ERRORS */ +wrong_caps: + { + GST_DEBUG_OBJECT (jitterbuffer, "received invalid caps"); + gst_event_unref (event); + return FALSE; + } +newseg_wrong_format: + { + GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); + gst_event_unref (event); + return FALSE; + } +} + +static gboolean +gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + gboolean ret = TRUE; + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (parent); + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: ret = gst_pad_push_event (priv->srcpad, event); gst_rtp_jitter_buffer_flush_start (jitterbuffer); @@ -1361,43 +1389,49 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent, GST_PAD_MODE_PUSH, TRUE); break; - case GST_EVENT_EOS: - { - /* push EOS in queue. We always push it at the head */ - JBUF_LOCK (priv); - /* check for flushing, we need to discard the event and return FALSE when - * we are flushing */ - ret = priv->srcresult == GST_FLOW_OK; - if (ret && !priv->eos) { - GST_INFO_OBJECT (jitterbuffer, "queuing EOS"); - priv->eos = TRUE; - JBUF_SIGNAL_EVENT (priv); - } else if (priv->eos) { - GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS"); + default: + if (GST_EVENT_IS_SERIALIZED (event)) { + /* serialized events go in the queue */ + JBUF_LOCK (priv); + if (priv->srcresult != GST_FLOW_OK) { + /* Errors in sticky event pushing are no problem and ignored here + * as they will cause more meaningful errors during data flow. + * For EOS events, that are not followed by data flow, we still + * return FALSE here though. + */ + if (!GST_EVENT_IS_STICKY (event) || + GST_EVENT_TYPE (event) == GST_EVENT_EOS) + goto out_flow_error; + } + /* refuse more events on EOS */ + if (priv->eos) + goto out_eos; + ret = queue_event (jitterbuffer, event); + JBUF_UNLOCK (priv); } else { - GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s", - gst_flow_get_name (priv->srcresult)); + /* non-serialized events are forwarded downstream immediately */ + ret = gst_pad_push_event (priv->srcpad, event); } - JBUF_UNLOCK (priv); - gst_event_unref (event); - break; - } - default: - ret = gst_pad_push_event (priv->srcpad, event); break; } - -done: - return ret; /* ERRORS */ -newseg_wrong_format: +out_flow_error: { - GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); - ret = FALSE; + GST_DEBUG_OBJECT (jitterbuffer, + "refusing event, we have a downstream flow error: %s", + gst_flow_get_name (priv->srcresult)); + JBUF_UNLOCK (priv); gst_event_unref (event); - goto done; + return FALSE; + } +out_eos: + { + GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS"); + JBUF_UNLOCK (priv); + gst_event_unref (event); + return FALSE; } } diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index edda166..2169718 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -536,8 +536,12 @@ setup_testharness (TestData * data) gst_pad_set_caps (data->test_src_pad, generate_caps ()); gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg)); - while ((obj = g_async_queue_try_pop (data->sink_event_queue))) - gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); } static void