From: Olivier CrĂȘte Date: Fri, 7 May 2010 20:42:22 +0000 (-0400) Subject: rtpmux: Aggregate incoming segments X-Git-Tag: 1.19.3~509^2~6210 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=8e58646f5c05bfdec67db1d282e856d6e7ed4265;p=platform%2Fupstream%2Fgstreamer.git rtpmux: Aggregate incoming segments --- diff --git a/gst/rtpmanager/gstrtpmux.c b/gst/rtpmanager/gstrtpmux.c index 851ea06..3c72497 100644 --- a/gst/rtpmanager/gstrtpmux.c +++ b/gst/rtpmanager/gstrtpmux.c @@ -62,6 +62,7 @@ typedef struct guint clock_base; GstCaps *out_caps; + GstSegment segment; } GstRTPMuxPadPrivate; static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", @@ -82,6 +83,7 @@ static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad); static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer); static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps); static GstCaps *gst_rtp_mux_getcaps (GstPad * pad); +static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event); static GstStateChangeReturn gst_rtp_mux_change_state (GstElement * element, GstStateChange transition); @@ -219,6 +221,8 @@ gst_rtp_mux_init (GstRTPMux * object, GstRTPMuxClass * g_class) object->ssrc = DEFAULT_SSRC; object->ts_offset = DEFAULT_TIMESTAMP_OFFSET; object->seqnum_offset = DEFAULT_SEQNUM_OFFSET; + + object->segment_pending = TRUE; } static void @@ -234,15 +238,15 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad) gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps); if (klass->chain_func) gst_pad_set_chain_function (sinkpad, klass->chain_func); - if (klass->sink_event_func) - gst_pad_set_event_function (sinkpad, klass->sink_event_func); + gst_pad_set_event_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event)); - /* This could break with gstreamer 0.10.9 */ - gst_pad_set_active (sinkpad, TRUE); + + gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED); gst_pad_set_element_private (sinkpad, padpriv); - /* dd the pad to the element */ + gst_pad_set_active (sinkpad, TRUE); gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad); } @@ -318,6 +322,7 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer) GstRTPMux *rtp_mux; GstFlowReturn ret; GstRTPMuxPadPrivate *padpriv; + GstEvent *newseg_event = NULL; rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad)); @@ -333,15 +338,34 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer) rtp_mux->seqnum++; gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum); padpriv = gst_pad_get_element_private (pad); - if (padpriv) + if (padpriv) { gst_buffer_set_caps (buffer, padpriv->out_caps); + if (padpriv->segment.format == GST_FORMAT_TIME) + GST_BUFFER_TIMESTAMP (buffer) = + gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME, + GST_BUFFER_TIMESTAMP (buffer)); + } + + if (rtp_mux->segment_pending) { + /* + * We set the start at 0, because we re-timestamps to the running time + */ + newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0, + GST_FORMAT_TIME, 0, -1, 0); + + rtp_mux->segment_pending = FALSE; + } GST_OBJECT_UNLOCK (rtp_mux); + gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc); gst_rtp_mux_readjust_rtp_timestamp (rtp_mux, pad, buffer); GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u", GST_BUFFER_SIZE (buffer), rtp_mux->seqnum, gst_rtp_buffer_get_timestamp (buffer)); + if (newseg_event) + gst_pad_push_event (rtp_mux->srcpad, newseg_event); + if (!padpriv) { ret = GST_FLOW_NOT_LINKED; gst_buffer_unref (buffer); @@ -569,10 +593,103 @@ gst_rtp_mux_set_property (GObject * object, } } +static gboolean +gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event) +{ + + GstRTPMux *mux; + gboolean ret = FALSE; + gboolean forward = TRUE; + + mux = GST_RTP_MUX (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + { + GstRTPMuxPadPrivate *padpriv; + + GST_OBJECT_LOCK (mux); + mux->segment_pending = TRUE; + padpriv = gst_pad_get_element_private (pad); + if (padpriv) + gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (pad); + } + break; + case GST_EVENT_NEWSEGMENT: + { + gboolean update; + gdouble rate, applied_rate; + GstFormat format; + gint64 start, stop, position; + GstRTPMuxPadPrivate *padpriv; + + gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate, + &format, &start, &stop, &position); + + GST_OBJECT_LOCK (mux); + padpriv = gst_pad_get_element_private (pad); + + if (padpriv) { + if (format == GST_FORMAT_TIME) + gst_segment_set_newsegment_full (&padpriv->segment, update, + rate, applied_rate, format, start, stop, position); + else + gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED); + } + GST_OBJECT_UNLOCK (mux); + gst_event_unref (event); + forward = FALSE; + ret = TRUE; + break; + } + default: + break; + } + + if (forward) { + GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (mux); + + if (klass->sink_event_func) + klass->sink_event_func (pad, event); + else + ret = gst_pad_push_event (mux->srcpad, event); + } + + gst_object_unref (mux); + return ret; +} + + +static void +clear_segment (gpointer data, gpointer user_data) +{ + GstPad *pad = data; + GstRTPMux *mux = user_data; + GstRTPMuxPadPrivate *padpriv; + + GST_OBJECT_LOCK (mux); + padpriv = gst_pad_get_element_private (pad); + if (padpriv) + gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (mux); + + gst_object_unref (pad); +} + + static void gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux) { + GstIterator *iter; + + iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux)); + while (gst_iterator_foreach (iter, clear_segment, rtp_mux) == + GST_ITERATOR_RESYNC); + gst_iterator_free (iter); + GST_OBJECT_LOCK (rtp_mux); + rtp_mux->segment_pending = TRUE; if (rtp_mux->ssrc == -1) rtp_mux->current_ssrc = g_random_int (); @@ -589,6 +706,7 @@ gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux) rtp_mux->ts_base = g_random_int (); else rtp_mux->ts_base = rtp_mux->ts_offset; + GST_DEBUG_OBJECT (rtp_mux, "set clock-base to %u", rtp_mux->ts_base); GST_OBJECT_UNLOCK (rtp_mux); diff --git a/gst/rtpmanager/gstrtpmux.h b/gst/rtpmanager/gstrtpmux.h index 77a394b..8be74ad 100644 --- a/gst/rtpmanager/gstrtpmux.h +++ b/gst/rtpmanager/gstrtpmux.h @@ -58,6 +58,8 @@ struct _GstRTPMux guint16 seqnum; /* protected by object lock */ guint ssrc; guint current_ssrc; + + gboolean segment_pending; }; struct _GstRTPMuxClass diff --git a/tests/check/elements/rtpmux.c b/tests/check/elements/rtpmux.c index b37522c..5c55634 100644 --- a/tests/check/elements/rtpmux.c +++ b/tests/check/elements/rtpmux.c @@ -60,6 +60,14 @@ setcaps_func (GstPad * pad, GstCaps * caps) return TRUE; } +static gboolean +event_func (GstPad * pad, GstEvent * event) +{ + gst_event_unref (event); + + return TRUE; +} + static void test_basic (const gchar * elem_name, int count, check_cb cb) { @@ -74,6 +82,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb) GstCaps *src2caps = NULL; GstCaps *sinkcaps = NULL; GstCaps *caps; + GstEvent *newsegment; int i; rtpmux = gst_check_setup_element (elem_name); @@ -92,6 +101,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb) gst_pad_set_getcaps_function (src2, getcaps_func); gst_pad_set_getcaps_function (sink, getcaps_func); gst_pad_set_setcaps_function (sink, setcaps_func); + gst_pad_set_event_function (sink, event_func); g_object_set_data (G_OBJECT (src1), "caps", &src1caps); g_object_set_data (G_OBJECT (src2), "caps", &src2caps); g_object_set_data (G_OBJECT (sink), "caps", &sinkcaps); @@ -130,8 +140,17 @@ test_basic (const gchar * elem_name, int count, check_cb cb) "ssrc", G_TYPE_UINT, 66, NULL); fail_unless (gst_pad_set_caps (src1, caps)); + newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME, + 100000, -1, 0); + fail_unless (gst_pad_push_event (src1, newsegment)); + newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME, + 50000, -1, 0); + fail_unless (gst_pad_push_event (src2, newsegment)); + for (i = 0; i < count; i++) { inbuf = gst_rtp_buffer_new_allocate (10, 0, 0); + GST_BUFFER_TIMESTAMP (inbuf) = i * 1000 + 100000; + GST_BUFFER_DURATION (inbuf) = 1000; gst_buffer_set_caps (inbuf, caps); gst_rtp_buffer_set_version (inbuf, 2); gst_rtp_buffer_set_payload_type (inbuf, 98); @@ -140,6 +159,10 @@ test_basic (const gchar * elem_name, int count, check_cb cb) gst_rtp_buffer_set_seq (inbuf, 2000 + i); fail_unless (gst_pad_push (src1, inbuf) == GST_FLOW_OK); + if (buffers) + fail_unless (GST_BUFFER_TIMESTAMP (buffers->data) == i * 1000, "%lld", + GST_BUFFER_TIMESTAMP (buffers->data)); + cb (src2, i); g_list_foreach (buffers, (GFunc) gst_buffer_unref, NULL);