X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Fgstrtpsession.c;h=04fca371610cfa3c3942fa716a34fae0931e6fbe;hb=5d79f4bddce27b92f1a9afa02b40e074a676ac92;hp=dc2d364df0f40102f6bc9b53adf787e3d2ddc3c5;hpb=1cebcfa8c238affe188c9411fd54b633e1526b97;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index dc2d364..04fca37 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -13,15 +13,15 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** - * SECTION:element-gstrtpsession - * @see_also: gstrtpjitterbuffer, gstrtpbin, gstrtpptdemux, gstrtpssrcdemux + * SECTION:element-rtpsession + * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux * - * The RTP session manager models one participant with a unique SSRC in an RTP + * The RTP session manager models participants with unique SSRC in an RTP * session. This session can be used to send and receive RTP and RTCP packets. * Based on what REQUEST pads are requested from the session manager, specific * functionality can be activated. @@ -40,9 +40,12 @@ * * Scheduling of RR/SR RTCP packets. * + * + * Support for multiple sender SSRC. + * * * - * The gstrtpsession will not demux packets based on SSRC or payload type, nor will + * The rtpsession will not demux packets based on SSRC or payload type, nor will * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux, * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to * perform these tasks. It is usually a good idea to use #GstRtpBin, which @@ -64,9 +67,8 @@ * that should be sent to all participants in the session. * * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will - * automatically create a send_rtp_src pad. The session manager will modify the - * SSRC in the RTP packets to its own SSRC and wil forward the packets on the - * send_rtp_src pad after updating its internal state. + * automatically create a send_rtp_src pad. The session manager will + * forward the packets on the send_rtp_src pad after updating its internal state. * * The session manager needs the clock-rate of the payload types it is handling * and will signal the #GstRtpSession::request-pt-map signal when it needs such a @@ -76,13 +78,13 @@ * * Example pipelines * |[ - * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink + * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink * ]| Receive theora RTP packets from port 5000 and send them to the depayloader, * decoder and display. Note that the application/x-rtp caps on udpsrc should be * configured based on some negotiation process such as RTSP for this pipeline * to work correctly. * |[ - * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession name=session \ + * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \ * .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \ * udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink * ]| Receive theora RTP packets from port 5000 and send them to the depayloader, @@ -92,11 +94,11 @@ * configured based on some negotiation process such as RTSP for this pipeline * to work correctly. * |[ - * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession .send_rtp_src ! udpsink port=5000 + * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000 * ]| Send theora RTP packets through the session manager and out on UDP port * 5000. * |[ - * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession name=session .send_rtp_src \ + * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \ * ! udpsink port=5000 session.send_rtcp_src ! udpsink port=5001 * ]| Send theora RTP packets through the session manager and out on UDP port * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll @@ -104,8 +106,6 @@ * packets are sent in the PAUSED state). Applications should manually set and * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state. * - * - * Last reviewed on 2007-05-28 (0.10.5) */ #ifdef HAVE_CONFIG_H @@ -116,13 +116,32 @@ #include -#include "gstrtpbin-marshal.h" #include "gstrtpsession.h" #include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug); #define GST_CAT_DEFAULT gst_rtp_session_debug +GType +gst_rtp_ntp_time_source_get_type (void) +{ + static GType type = 0; + static const GEnumValue values[] = { + {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"}, + {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"}, + {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME, + "Running time based on pipeline clock", + "running-time"}, + {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"}, + {0, NULL, NULL}, + }; + + if (!type) { + type = g_enum_register_static ("GstRtpNtpTimeSource", values); + } + return type; +} + /* sink pads */ static GstStaticPadTemplate rtpsession_recv_rtp_sink_template = GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink", @@ -189,11 +208,13 @@ enum SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, SIGNAL_ON_SENDER_TIMEOUT, + SIGNAL_ON_NEW_SENDER_SSRC, + SIGNAL_ON_SENDER_SSRC_ACTIVE, LAST_SIGNAL }; -#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH -#define DEFAULT_RTCP_FRACTION (RTP_STATS_BANDWIDTH * RTP_STATS_RTCP_FRACTION) +#define DEFAULT_BANDWIDTH 0 +#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION #define DEFAULT_RTCP_RR_BANDWIDTH -1 #define DEFAULT_RTCP_RS_BANDWIDTH -1 #define DEFAULT_SDES NULL @@ -202,6 +223,11 @@ enum #define DEFAULT_USE_PIPELINE_CLOCK FALSE #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) #define DEFAULT_PROBATION RTP_DEFAULT_PROBATION +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP +#define DEFAULT_NTP_TIME_SOURCE GST_RTP_NTP_TIME_SOURCE_NTP +#define DEFAULT_RTCP_SYNC_SEND_TIME TRUE enum { @@ -217,18 +243,24 @@ enum PROP_USE_PIPELINE_CLOCK, PROP_RTCP_MIN_INTERVAL, PROP_PROBATION, - PROP_LAST + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, + PROP_STATS, + PROP_RTP_PROFILE, + PROP_NTP_TIME_SOURCE, + PROP_RTCP_SYNC_SEND_TIME }; -#define GST_RTP_SESSION_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate)) - #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock (&(sess)->priv->lock) #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock) +#define GST_RTP_SESSION_WAIT(sess) g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock) +#define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond) + struct _GstRtpSessionPrivate { GMutex lock; + GCond cond; GstClock *sysclock; RTPSession *session; @@ -238,11 +270,19 @@ struct _GstRtpSessionPrivate gboolean stop_thread; GThread *thread; gboolean thread_stopped; + gboolean wait_send; /* caps mapping */ GHashTable *ptmap; + GstClockTime send_latency; + gboolean use_pipeline_clock; + GstRtpNtpTimeSource ntp_time_source; + gboolean rtcp_sync_send_time; + + guint recv_rtx_req_count; + guint sent_rtx_req_count; }; /* callbacks to handle actions from the session manager */ @@ -253,14 +293,27 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data); static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, - RTPSource * src, GstBuffer * buffer, gpointer user_data); + GstBuffer * buffer, gpointer user_data); static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); -static void gst_rtp_session_request_key_unit (RTPSession * sess, +static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, gboolean all_headers, gpointer user_data); static GstClockTime gst_rtp_session_request_time (RTPSession * session, gpointer user_data); +static void gst_rtp_session_notify_nack (RTPSession * sess, + guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data); +static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data); +static void gst_rtp_session_notify_early_rtcp (RTPSession * sess, + gpointer user_data); +static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad, + GstObject * parent, GstBufferList * list); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, @@ -270,7 +323,10 @@ static RTPSessionCallbacks callbacks = { gst_rtp_session_clock_rate, gst_rtp_session_reconsider, gst_rtp_session_request_key_unit, - gst_rtp_session_request_time + gst_rtp_session_request_time, + gst_rtp_session_notify_nack, + gst_rtp_session_reconfigure, + gst_rtp_session_notify_early_rtcp }; /* GObject vmethods */ @@ -294,6 +350,8 @@ static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad, static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession); +static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession); + static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; static void @@ -306,8 +364,39 @@ on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess) static void on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess) { + GstPad *send_rtp_sink; + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0, src->ssrc); + + GST_RTP_SESSION_LOCK (sess); + if ((send_rtp_sink = sess->send_rtp_sink)) + gst_object_ref (send_rtp_sink); + GST_RTP_SESSION_UNLOCK (sess); + + if (send_rtp_sink) { + GstStructure *structure; + GstEvent *event; + RTPSource *internal_src; + guint32 suggested_ssrc; + + structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT, + (guint) src->ssrc, NULL); + + /* if there is no source using the suggested ssrc, most probably because + * this ssrc has just collided, suggest upstream to use it */ + suggested_ssrc = rtp_session_suggest_ssrc (session, NULL); + internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc); + if (!internal_src) + gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT, + (guint) suggested_ssrc, NULL); + else + g_object_unref (internal_src); + + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure); + gst_pad_push_event (send_rtp_sink, event); + gst_object_unref (send_rtp_sink); + } } static void @@ -370,8 +459,30 @@ on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess) src->ssrc); } +static void +on_new_sender_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0, + src->ssrc); +} + +static void +on_sender_ssrc_active (RTPSession * session, RTPSource * src, + GstRtpSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0, + src->ssrc); +} + +static void +on_notify_stats (RTPSession * session, GParamSpec * spec, + GstRtpSession * rtpsession) +{ + g_object_notify (G_OBJECT (rtpsession), "stats"); +} + #define gst_rtp_session_parent_class parent_class -G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT); +G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT); static void gst_rtp_session_class_init (GstRtpSessionClass * klass) @@ -382,8 +493,6 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; - g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate)); - gobject_class->finalize = gst_rtp_session_finalize; gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; @@ -398,8 +507,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] = g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map), - NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, - G_TYPE_UINT); + NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT); /** * GstRtpSession::clear-pt-map: * @sess: the object which received the signal @@ -408,8 +516,9 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) */ gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] = g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map), - NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE); /** * GstRtpSession::on-new-ssrc: @@ -447,7 +556,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); /** - * GstRtpSession::on-ssrc_active: + * GstRtpSession::on-ssrc-active: * @sess: the object which received the signal * @ssrc: the SSRC * @@ -516,6 +625,35 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRtpSession::on-new-sender-ssrc: + * @sess: the object which received the signal + * @ssrc: the sender SSRC + * + * Notify of a new sender SSRC that entered @session. + * + * Since: 1.8 + */ + gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] = + g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + + /** + * GstRtpSession::on-sender-ssrc-active: + * @sess: the object which received the signal + * @ssrc: the sender SSRC + * + * Notify of a sender SSRC that is active, i.e., sending RTCP. + * + * Since: 1.8 + */ + gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] = + g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, + on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT, + G_TYPE_NONE, 1, G_TYPE_UINT); + g_object_class_install_property (gobject_class, PROP_BANDWIDTH, g_param_spec_double ("bandwidth", "Bandwidth", "The bandwidth of the session in bytes per second (0 for auto-discover)", @@ -564,9 +702,10 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK, g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock", - "Use the pipeline running-time to set the NTP time in the RTCP SR messages", + "Use the pipeline running-time to set the NTP time in the RTCP SR messages " + "(DEPRECATED: Use ntp-time-source property)", DEFAULT_USE_PIPELINE_CLOCK, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED)); g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL, g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval", @@ -580,6 +719,62 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) 0, G_MAXUINT, DEFAULT_PROBATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstRtpSession::stats: + * + * Various session statistics. This property returns a GstStructure + * with name application/x-rtp-session-stats with the following fields: + * + * "recv-rtx-req-count G_TYPE_UINT The number of retransmission event + * received from downstream (in receiver mode) (Since 1.16) + * "sent-rtx-req-count" G_TYPE_UINT The number of retransmission event + * sent downstream (in sender mode) (Since 1.16) + * "rtx-count" G_TYPE_UINT DEPRECATED Since 1.16, same as + * "recv-rtx-req-count". + * "rtx-drop-count" G_TYPE_UINT The number of retransmission events + * dropped (due to bandwidth constraints) + * "sent-nack-count" G_TYPE_UINT Number of NACKs sent + * "recv-nack-count" G_TYPE_UINT Number of NACKs received + * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource::stats for all + * RTP sources (Since 1.8) + * + * Since: 1.4 + */ + g_object_class_install_property (gobject_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Various statistics", GST_TYPE_STRUCTURE, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTP_PROFILE, + g_param_spec_enum ("rtp-profile", "RTP Profile", + "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE, + g_param_spec_enum ("ntp-time-source", "NTP Time Source", + "NTP time source for RTCP packets", + gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME, + g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time", + "Use send time or capture time for RTCP sync " + "(TRUE = send time, FALSE = capture time)", + DEFAULT_RTCP_SYNC_SEND_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -590,22 +785,22 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map); /* sink pads */ - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_send_rtp_sink_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_recv_rtp_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_recv_rtcp_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_send_rtp_sink_template); /* src pads */ - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_recv_rtp_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_sync_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_send_rtp_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&rtpsession_send_rtcp_src_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_recv_rtp_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_sync_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_send_rtp_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpsession_send_rtcp_src_template); gst_element_class_set_static_metadata (gstelement_class, "RTP Session", "Filter/Network/RTP", @@ -613,16 +808,24 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug, "rtpsession", 0, "RTP Session"); + + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list); + } static void gst_rtp_session_init (GstRtpSession * rtpsession) { - rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession); + rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession); g_mutex_init (&rtpsession->priv->lock); + g_cond_init (&rtpsession->priv->cond); rtpsession->priv->sysclock = gst_system_clock_obtain (); rtpsession->priv->session = rtp_session_new (); rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK; + rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME; /* configure callbacks */ rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession); @@ -645,13 +848,26 @@ gst_rtp_session_init (GstRtpSession * rtpsession) (GCallback) on_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-sender-timeout", (GCallback) on_sender_timeout, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-new-sender-ssrc", + (GCallback) on_new_sender_ssrc, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-sender-ssrc-active", + (GCallback) on_sender_ssrc_active, rtpsession); + g_signal_connect (rtpsession->priv->session, "notify::stats", + (GCallback) on_notify_stats, rtpsession); rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); + rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID; + gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED); rtpsession->priv->thread_stopped = TRUE; + + rtpsession->priv->recv_rtx_req_count = 0; + rtpsession->priv->sent_rtx_req_count = 0; + + rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE; } static void @@ -663,6 +879,7 @@ gst_rtp_session_finalize (GObject * object) g_hash_table_destroy (rtpsession->priv->ptmap); g_mutex_clear (&rtpsession->priv->lock); + g_cond_clear (&rtpsession->priv->cond); g_object_unref (rtpsession->priv->sysclock); g_object_unref (rtpsession->priv->session); @@ -707,6 +924,23 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, case PROP_PROBATION: g_object_set_property (G_OBJECT (priv->session), "probation", value); break; + case PROP_MAX_DROPOUT_TIME: + g_object_set_property (G_OBJECT (priv->session), "max-dropout-time", + value); + break; + case PROP_MAX_MISORDER_TIME: + g_object_set_property (G_OBJECT (priv->session), "max-misorder-time", + value); + break; + case PROP_RTP_PROFILE: + g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value); + break; + case PROP_NTP_TIME_SOURCE: + priv->ntp_time_source = g_value_get_enum (value); + break; + case PROP_RTCP_SYNC_SEND_TIME: + priv->rtcp_sync_send_time = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -761,17 +995,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, case PROP_PROBATION: g_object_get_property (G_OBJECT (priv->session), "probation", value); break; + case PROP_MAX_DROPOUT_TIME: + g_object_get_property (G_OBJECT (priv->session), "max-dropout-time", + value); + break; + case PROP_MAX_MISORDER_TIME: + g_object_get_property (G_OBJECT (priv->session), "max-misorder-time", + value); + break; + case PROP_STATS: + g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession)); + break; + case PROP_RTP_PROFILE: + g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value); + break; + case PROP_NTP_TIME_SOURCE: + g_value_set_enum (value, priv->ntp_time_source); + break; + case PROP_RTCP_SYNC_SEND_TIME: + g_value_set_boolean (value, priv->rtcp_sync_send_time); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static GstStructure * +gst_rtp_session_create_stats (GstRtpSession * rtpsession) +{ + GstStructure *s; + + g_object_get (rtpsession->priv->session, "stats", &s, NULL); + gst_structure_set (s, "rtx-count", G_TYPE_UINT, + rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT, + rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT, + rtpsession->priv->sent_rtx_req_count, NULL); + + return s; +} + static void get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time, guint64 * ntpnstime) { - guint64 ntpns; + guint64 ntpns = -1; GstClock *clock; GstClockTime base_time, rt, clock_time; @@ -781,24 +1049,42 @@ get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time, gst_object_ref (clock); GST_OBJECT_UNLOCK (rtpsession); + /* get current clock time and convert to running time */ clock_time = gst_clock_get_time (clock); + rt = clock_time - base_time; if (rtpsession->priv->use_pipeline_clock) { - ntpns = clock_time - base_time; + ntpns = rt; + /* add constant to convert from 1970 based time to 1900 based time */ + ntpns += (2208988800LL * GST_SECOND); } else { - GTimeVal current; - - /* get current NTP time */ - g_get_current_time (¤t); - ntpns = GST_TIMEVAL_TO_TIME (current); + switch (rtpsession->priv->ntp_time_source) { + case GST_RTP_NTP_TIME_SOURCE_NTP: + case GST_RTP_NTP_TIME_SOURCE_UNIX:{ + GTimeVal current; + + /* get current NTP time */ + g_get_current_time (¤t); + ntpns = GST_TIMEVAL_TO_TIME (current); + + /* add constant to convert from 1970 based time to 1900 based time */ + if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP) + ntpns += (2208988800LL * GST_SECOND); + break; + } + case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME: + ntpns = rt; + break; + case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME: + ntpns = clock_time; + break; + default: + ntpns = -1; + g_assert_not_reached (); + break; + } } - /* add constant to convert from 1970 based time to 1900 based time */ - ntpns += (2208988800LL * GST_SECOND); - - /* get current clock time and convert to running time */ - rt = clock_time - base_time; - gst_object_unref (clock); } else { GST_OBJECT_UNLOCK (rtpsession); @@ -811,6 +1097,17 @@ get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time, *ntpnstime = ntpns; } +/* must be called with GST_RTP_SESSION_LOCK */ +static void +signal_waiting_rtcp_thread_unlocked (GstRtpSession * rtpsession) +{ + if (rtpsession->priv->wait_send) { + GST_LOG_OBJECT (rtpsession, "signal RTCP thread"); + rtpsession->priv->wait_send = FALSE; + GST_RTP_SESSION_SIGNAL (rtpsession); + } +} + static void rtcp_thread (GstRtpSession * rtpsession) { @@ -826,6 +1123,12 @@ rtcp_thread (GstRtpSession * rtpsession) GST_RTP_SESSION_LOCK (rtpsession); + while (rtpsession->priv->wait_send) { + GST_LOG_OBJECT (rtpsession, "waiting for getting started"); + GST_RTP_SESSION_WAIT (rtpsession); + GST_LOG_OBJECT (rtpsession, "signaled..."); + } + sysclock = rtpsession->priv->sysclock; current_time = gst_clock_get_time (sysclock); @@ -925,6 +1228,7 @@ stop_rtcp_thread (GstRtpSession * rtpsession) GST_RTP_SESSION_LOCK (rtpsession); rtpsession->priv->stop_thread = TRUE; + signal_waiting_rtcp_thread_unlocked (rtpsession); if (rtpsession->priv->id) gst_clock_id_unschedule (rtpsession->priv->id); GST_RTP_SESSION_UNLOCK (rtpsession); @@ -961,6 +1265,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->wait_send = TRUE; + GST_RTP_SESSION_UNLOCK (rtpsession); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; @@ -987,6 +1294,7 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PAUSED_TO_READY: /* downstream is now releasing the dataflow and we can join. */ join_rtcp_thread (rtpsession); + rtp_session_reset (rtpsession->priv->session); break; case GST_STATE_CHANGE_READY_TO_NULL: break; @@ -1011,7 +1319,9 @@ return_true (gpointer key, gpointer value, gpointer user_data) static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession) { + GST_RTP_SESSION_LOCK (rtpsession); g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL); + GST_RTP_SESSION_UNLOCK (rtpsession); } /* called when the session manager has an RTP packet or a list of packets @@ -1058,6 +1368,7 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, GST_RTP_SESSION_LOCK (rtpsession); if ((rtp_src = rtpsession->send_rtp_src)) gst_object_ref (rtp_src); + signal_waiting_rtcp_thread_unlocked (rtpsession); GST_RTP_SESSION_UNLOCK (rtpsession); if (rtp_src) { @@ -1076,12 +1387,65 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, return result; } +static void +do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad) +{ + GstCaps *caps; + GstSegment seg; + GstEvent *event; + gchar *stream_id; + gboolean have_group_id; + guint group_id; + + stream_id = + g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (), + g_random_int (), g_random_int ()); + + GST_RTP_SESSION_LOCK (rtpsession); + if (rtpsession->recv_rtp_sink) { + event = + gst_pad_get_sticky_event (rtpsession->recv_rtp_sink, + GST_EVENT_STREAM_START, 0); + if (event) { + if (gst_event_parse_group_id (event, &group_id)) + have_group_id = TRUE; + else + have_group_id = FALSE; + gst_event_unref (event); + } else { + have_group_id = TRUE; + group_id = gst_util_group_id_next (); + } + } else { + have_group_id = TRUE; + group_id = gst_util_group_id_next (); + } + GST_RTP_SESSION_UNLOCK (rtpsession); + + event = gst_event_new_stream_start (stream_id); + rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event); + gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum); + if (have_group_id) + gst_event_set_group_id (event, group_id); + gst_pad_push_event (srcpad, event); + g_free (stream_id); + + caps = gst_caps_new_empty_simple ("application/x-rtcp"); + gst_pad_set_caps (srcpad, caps); + gst_caps_unref (caps); + + gst_segment_init (&seg, GST_FORMAT_TIME); + event = gst_event_new_segment (&seg); + gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum); + gst_pad_push_event (srcpad, event); +} + /* called when the session manager has an RTCP packet ready for further * sending. The eos flag is set when an EOS event should be sent downstream as * well. */ static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, - GstBuffer * buffer, gboolean eos, gpointer user_data) + GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data) { GstFlowReturn result; GstRtpSession *rtpsession; @@ -1094,25 +1458,28 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, goto stopping; if ((rtcp_src = rtpsession->send_rtcp_src)) { - GstCaps *caps; - gst_object_ref (rtcp_src); GST_RTP_SESSION_UNLOCK (rtpsession); /* set rtcp caps on output pad */ - if (!(caps = gst_pad_get_current_caps (rtcp_src))) { - caps = gst_caps_new_empty_simple ("application/x-rtcp"); - gst_pad_set_caps (rtcp_src, caps); - } - gst_caps_unref (caps); + if (!gst_pad_has_current_caps (rtcp_src)) + do_rtcp_events (rtpsession, rtcp_src); GST_LOG_OBJECT (rtpsession, "sending RTCP"); result = gst_pad_push (rtcp_src, buffer); - /* we have to send EOS after this packet */ - if (eos) { + /* Forward send an EOS on the RTCP sink if we received an EOS on the + * send_rtp_sink. We don't need to check the recv_rtp_sink since in this + * case the EOS event would already have been sent */ + if (all_sources_bye && rtpsession->send_rtp_sink && + GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) { + GstEvent *event; + GST_LOG_OBJECT (rtpsession, "sending EOS"); - gst_pad_push_event (rtcp_src, gst_event_new_eos ()); + + event = gst_event_new_eos (); + gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum); + gst_pad_push_event (rtcp_src, event); } gst_object_unref (rtcp_src); } else { @@ -1137,7 +1504,7 @@ stopping: /* called when the session manager has an SR RTCP packet ready for handling * inter stream synchronisation */ static GstFlowReturn -gst_rtp_session_sync_rtcp (RTPSession * sess, RTPSource * src, +gst_rtp_session_sync_rtcp (RTPSession * sess, GstBuffer * buffer, gpointer user_data) { GstFlowReturn result; @@ -1151,17 +1518,16 @@ gst_rtp_session_sync_rtcp (RTPSession * sess, RTPSource * src, goto stopping; if ((sync_src = rtpsession->sync_src)) { - GstCaps *caps; - gst_object_ref (sync_src); GST_RTP_SESSION_UNLOCK (rtpsession); - /* set rtcp caps on output pad */ - if (!(caps = gst_pad_get_current_caps (sync_src))) { - caps = gst_caps_new_empty_simple ("application/x-rtcp"); - gst_pad_set_caps (sync_src, caps); - } - gst_caps_unref (caps); + /* set rtcp caps on output pad, this happens + * when we receive RTCP muxed with RTP according + * to RFC5761. Otherwise we would have forwarded + * the events from the recv_rtcp_sink pad already + */ + if (!gst_pad_has_current_caps (sync_src)) + do_rtcp_events (rtpsession, sync_src); GST_LOG_OBJECT (rtpsession, "sending Sync RTCP"); result = gst_pad_push (sync_src, buffer); @@ -1337,6 +1703,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent, } case GST_EVENT_FLUSH_STOP: gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); + rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID; ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); break; case GST_EVENT_SEGMENT: @@ -1359,6 +1726,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent, ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); break; } + case GST_EVENT_EOS: + { + GstPad *rtcp_src; + + ret = + gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event)); + + GST_RTP_SESSION_LOCK (rtpsession); + if ((rtcp_src = rtpsession->send_rtcp_src)) + gst_object_ref (rtcp_src); + GST_RTP_SESSION_UNLOCK (rtpsession); + + gst_event_unref (event); + + if (rtcp_src) { + event = gst_event_new_eos (); + if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID) + gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum); + ret = gst_pad_push_event (rtcp_src, event); + gst_object_unref (rtcp_src); + } else { + ret = TRUE; + } + break; + } default: ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); break; @@ -1394,7 +1786,7 @@ gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession, if (pli || fir) return rtp_session_request_key_unit (rtpsession->priv->session, ssrc, - gst_clock_get_time (rtpsession->priv->sysclock), fir, count); + fir, count); } return FALSE; @@ -1428,14 +1820,59 @@ gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent, if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt, all_headers, count)) forward = FALSE; + } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { + guint seqnum, delay, deadline, max_delay, avg_rtt; + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->recv_rtx_req_count++; + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + if (!gst_structure_get_uint (s, "delay", &delay)) + delay = 0; + if (!gst_structure_get_uint (s, "deadline", &deadline)) + deadline = 100; + if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt)) + avg_rtt = 40; + + /* remaining time to receive the packet */ + max_delay = deadline; + if (max_delay > delay) + max_delay -= delay; + /* estimated RTT */ + if (max_delay > avg_rtt) + max_delay -= avg_rtt; + else + max_delay = 0; + + if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum, + max_delay * GST_MSECOND)) + forward = FALSE; } break; default: break; } - if (forward) - ret = gst_pad_push_event (rtpsession->recv_rtp_sink, event); + if (forward) { + GstPad *recv_rtp_sink; + + GST_RTP_SESSION_LOCK (rtpsession); + if ((recv_rtp_sink = rtpsession->recv_rtp_sink)) + gst_object_ref (recv_rtp_sink); + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (recv_rtp_sink) { + ret = gst_pad_push_event (recv_rtp_sink, event); + gst_object_unref (recv_rtp_sink); + } else + gst_event_unref (event); + } else { + gst_event_unref (event); + } return ret; } @@ -1470,6 +1907,8 @@ gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent) it = gst_iterator_new_single (GST_TYPE_PAD, &val); g_value_unset (&val); gst_object_unref (otherpad); + } else { + it = gst_iterator_new_single (GST_TYPE_PAD, NULL); } return it; @@ -1498,26 +1937,32 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent, GstFlowReturn ret; GstClockTime current_time, running_time; GstClockTime timestamp; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (parent); priv = rtpsession->priv; GST_LOG_OBJECT (rtpsession, "received RTP packet"); + GST_RTP_SESSION_LOCK (rtpsession); + signal_waiting_rtcp_thread_unlocked (rtpsession); + GST_RTP_SESSION_UNLOCK (rtpsession); + /* get NTP time when this packet was captured, this depends on the timestamp. */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); + timestamp = GST_BUFFER_PTS (buffer); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { /* convert to running time using the segment values */ running_time = gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME, timestamp); + ntpnstime = GST_CLOCK_TIME_NONE; } else { - get_current_times (rtpsession, &running_time, NULL); + get_current_times (rtpsession, &running_time, &ntpnstime); } current_time = gst_clock_get_time (priv->sysclock); ret = rtp_session_process_rtp (priv->session, buffer, current_time, - running_time); + running_time, ntpnstime); if (ret != GST_FLOW_OK) goto push_error; @@ -1547,6 +1992,18 @@ gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent, GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + /* Make sure that the sync_src pad has caps before the segment event. + * Otherwise we might get a segment event before caps from the receive + * RTCP pad, and then later when receiving RTCP packets will set caps. + * This will results in a sticky event misordering warning + */ + if (!gst_pad_has_current_caps (rtpsession->sync_src)) { + GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp"); + gst_pad_set_caps (rtpsession->sync_src, caps); + gst_caps_unref (caps); + } + /* fall through */ default: ret = gst_pad_push_event (rtpsession->sync_src, event); break; @@ -1565,6 +2022,7 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent, GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstClockTime current_time; + GstClockTime running_time; guint64 ntpnstime; rtpsession = GST_RTP_SESSION (parent); @@ -1572,10 +2030,15 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent, GST_LOG_OBJECT (rtpsession, "received RTCP packet"); + GST_RTP_SESSION_LOCK (rtpsession); + signal_waiting_rtcp_thread_unlocked (rtpsession); + GST_RTP_SESSION_UNLOCK (rtpsession); + current_time = gst_clock_get_time (priv->sysclock); - get_current_times (rtpsession, NULL, &ntpnstime); + get_current_times (rtpsession, &running_time, &ntpnstime); - rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime); + rtp_session_process_rtcp (priv->session, buffer, current_time, running_time, + ntpnstime); return GST_FLOW_OK; /* always return OK */ } @@ -1589,7 +2052,8 @@ gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent, rtpsession = GST_RTP_SESSION (parent); - GST_DEBUG_OBJECT (rtpsession, "received QUERY"); + GST_DEBUG_OBJECT (rtpsession, "received QUERY %s", + GST_QUERY_TYPE_NAME (query)); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_LATENCY: @@ -1613,7 +2077,8 @@ gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent, gboolean ret = TRUE; rtpsession = GST_RTP_SESSION (parent); - GST_DEBUG_OBJECT (rtpsession, "received EVENT"); + GST_DEBUG_OBJECT (rtpsession, "received EVENT %s", + GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: @@ -1641,7 +2106,8 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent, rtpsession = GST_RTP_SESSION (parent); - GST_DEBUG_OBJECT (rtpsession, "received event"); + GST_DEBUG_OBJECT (rtpsession, "received EVENT %s", + GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_CAPS: @@ -1684,16 +2150,18 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent, * because we stop sending. */ ret = gst_pad_push_event (rtpsession->send_rtp_src, event); current_time = gst_clock_get_time (rtpsession->priv->sysclock); + GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message"); - rtp_session_schedule_bye (rtpsession->priv->session, "End of stream", - current_time); + rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream"); + rtp_session_schedule_bye (rtpsession->priv->session, current_time); break; } default:{ - GstPad *send_rtp_src = NULL; + GstPad *send_rtp_src; + GST_RTP_SESSION_LOCK (rtpsession); - if (rtpsession->send_rtp_src) - send_rtp_src = gst_object_ref (rtpsession->send_rtp_src); + if ((send_rtp_src = rtpsession->send_rtp_src)) + gst_object_ref (send_rtp_src); GST_RTP_SESSION_UNLOCK (rtpsession); if (send_rtp_src) { @@ -1709,6 +2177,33 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent, return ret; } +static gboolean +gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRtpSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (parent); + + GST_DEBUG_OBJECT (rtpsession, "received EVENT %s", + GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_LATENCY: + /* save the latency, we need this to know when an RTP packet will be + * rendered by the sink */ + gst_event_parse_latency (event, &rtpsession->priv->send_latency); + + ret = gst_pad_event_default (pad, parent, event); + break; + default: + ret = gst_pad_event_default (pad, parent, event); + break; + } + return ret; +} + static GstCaps * gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession, GstCaps * filter) @@ -1717,18 +2212,26 @@ gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession, GstCaps *result; GstStructure *s1, *s2; guint ssrc; + gboolean is_random; priv = rtpsession->priv; - ssrc = rtp_session_get_internal_ssrc (priv->session); + ssrc = rtp_session_suggest_ssrc (priv->session, &is_random); /* we can basically accept anything but we prefer to receive packets with our * internal SSRC so that we don't have to patch it. Create a structure with - * the SSRC and another one without. */ - s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc, NULL); - s2 = gst_structure_new_empty ("application/x-rtp"); - - result = gst_caps_new_full (s1, s2, NULL); + * the SSRC and another one without. + * Only do this if the session actually decided on an ssrc already, + * otherwise we give upstream the opportunity to select an ssrc itself */ + if (!is_random) { + s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc, + NULL); + s2 = gst_structure_new_empty ("application/x-rtp"); + + result = gst_caps_new_full (s1, s2, NULL); + } else { + result = gst_caps_new_empty_simple ("application/x-rtp"); + } if (filter) { GstCaps *caps = result; @@ -1776,19 +2279,15 @@ gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession, GstCaps * caps) { GstRtpSessionPrivate *priv; - GstStructure *s = gst_caps_get_structure (caps, 0); - guint ssrc; priv = rtpsession->priv; - if (gst_structure_get_uint (s, "ssrc", &ssrc)) { - GST_DEBUG_OBJECT (rtpsession, "setting internal SSRC to %08x", ssrc); - rtp_session_set_internal_ssrc (priv->session, ssrc); - } + rtp_session_update_send_caps (priv->session, caps); + return TRUE; } -/* Recieve an RTP packet or a list of packets to be send to the receivers, +/* Receive an RTP packet or a list of packets to be sent to the receivers, * send to RTP session manager and forward to send_rtp_src. */ static GstFlowReturn @@ -1808,15 +2307,15 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession, if (is_list) { GstBuffer *buffer = NULL; - /* All groups in an list have the same timestamp. - * So, just take it from the first group. */ + /* All buffers in a list have the same timestamp. + * So, just take it from the first buffer. */ buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0); if (buffer) - timestamp = GST_BUFFER_TIMESTAMP (buffer); + timestamp = GST_BUFFER_PTS (buffer); else timestamp = -1; } else { - timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (data)); + timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data)); } if (GST_CLOCK_TIME_IS_VALID (timestamp)) { @@ -1824,6 +2323,8 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession, running_time = gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME, timestamp); + if (priv->rtcp_sync_send_time) + running_time += priv->send_latency; } else { /* no timestamp. */ running_time = -1; @@ -1880,9 +2381,10 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) gst_pad_set_chain_function (rtpsession->recv_rtp_sink, gst_rtp_session_chain_recv_rtp); gst_pad_set_event_function (rtpsession->recv_rtp_sink, - (GstPadEventFunction) gst_rtp_session_event_recv_rtp_sink); + gst_rtp_session_event_recv_rtp_sink); gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink, gst_rtp_session_iterate_internal_links); + GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink); gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_sink); @@ -1892,7 +2394,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); gst_pad_set_event_function (rtpsession->recv_rtp_src, - (GstPadEventFunction) gst_rtp_session_event_recv_rtp_src); + gst_rtp_session_event_recv_rtp_src); gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src, gst_rtp_session_iterate_internal_links); gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); @@ -1939,7 +2441,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession) gst_pad_set_chain_function (rtpsession->recv_rtcp_sink, gst_rtp_session_chain_recv_rtcp); gst_pad_set_event_function (rtpsession->recv_rtcp_sink, - (GstPadEventFunction) gst_rtp_session_event_recv_rtcp_sink); + gst_rtp_session_event_recv_rtcp_sink); gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink, gst_rtp_session_iterate_internal_links); gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE); @@ -1994,9 +2496,11 @@ create_send_rtp_sink (GstRtpSession * rtpsession) gst_pad_set_query_function (rtpsession->send_rtp_sink, gst_rtp_session_query_send_rtp); gst_pad_set_event_function (rtpsession->send_rtp_sink, - (GstPadEventFunction) gst_rtp_session_event_send_rtp_sink); + gst_rtp_session_event_send_rtp_sink); gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink, gst_rtp_session_iterate_internal_links); + GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink); + GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink); gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_sink); @@ -2006,6 +2510,9 @@ create_send_rtp_sink (GstRtpSession * rtpsession) "send_rtp_src"); gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src, gst_rtp_session_iterate_internal_links); + gst_pad_set_event_function (rtpsession->send_rtp_src, + gst_rtp_session_event_send_rtp_src); + GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -2120,13 +2627,13 @@ gst_rtp_session_request_new_pad (GstElement * element, wrong_template: { GST_RTP_SESSION_UNLOCK (rtpsession); - g_warning ("gstrtpsession: this is not our template"); + g_warning ("rtpsession: this is not our template"); return NULL; } exists: { GST_RTP_SESSION_UNLOCK (rtpsession); - g_warning ("gstrtpsession: pad already requested"); + g_warning ("rtpsession: pad already requested"); return NULL; } } @@ -2164,22 +2671,31 @@ gst_rtp_session_release_pad (GstElement * element, GstPad * pad) wrong_pad: { GST_RTP_SESSION_UNLOCK (rtpsession); - g_warning ("gstrtpsession: asked to release an unknown pad"); + g_warning ("rtpsession: asked to release an unknown pad"); return; } } static void gst_rtp_session_request_key_unit (RTPSession * sess, - gboolean all_headers, gpointer user_data) + guint32 ssrc, gboolean all_headers, gpointer user_data) { GstRtpSession *rtpsession = GST_RTP_SESSION (user_data); GstEvent *event; + GstPad *send_rtp_sink; + + GST_RTP_SESSION_LOCK (rtpsession); + if ((send_rtp_sink = rtpsession->send_rtp_sink)) + gst_object_ref (send_rtp_sink); + GST_RTP_SESSION_UNLOCK (rtpsession); - event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, - gst_structure_new ("GstForceKeyUnit", - "all-headers", G_TYPE_BOOLEAN, all_headers, NULL)); - gst_pad_push_event (rtpsession->send_rtp_sink, event); + if (send_rtp_sink) { + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstForceKeyUnit", "ssrc", G_TYPE_UINT, ssrc, + "all-headers", G_TYPE_BOOLEAN, all_headers, NULL)); + gst_pad_push_event (send_rtp_sink, event); + gst_object_unref (send_rtp_sink); + } } static GstClockTime @@ -2189,3 +2705,71 @@ gst_rtp_session_request_time (RTPSession * session, gpointer user_data) return gst_clock_get_time (rtpsession->priv->sysclock); } + +static void +gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum, + guint16 blp, guint32 ssrc, gpointer user_data) +{ + GstRtpSession *rtpsession = GST_RTP_SESSION (user_data); + GstEvent *event; + GstPad *send_rtp_sink; + + GST_RTP_SESSION_LOCK (rtpsession); + if ((send_rtp_sink = rtpsession->send_rtp_sink)) + gst_object_ref (send_rtp_sink); + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (send_rtp_sink) { + while (TRUE) { + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstRTPRetransmissionRequest", + "seqnum", G_TYPE_UINT, (guint) seqnum, + "ssrc", G_TYPE_UINT, (guint) ssrc, NULL)); + gst_pad_push_event (send_rtp_sink, event); + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->sent_rtx_req_count++; + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (blp == 0) + break; + + seqnum++; + while ((blp & 1) == 0) { + seqnum++; + blp >>= 1; + } + blp >>= 1; + } + gst_object_unref (send_rtp_sink); + } +} + +static void +gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data) +{ + GstRtpSession *rtpsession = GST_RTP_SESSION (user_data); + GstPad *send_rtp_sink; + + GST_RTP_SESSION_LOCK (rtpsession); + if ((send_rtp_sink = rtpsession->send_rtp_sink)) + gst_object_ref (send_rtp_sink); + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (send_rtp_sink) { + gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ()); + gst_object_unref (send_rtp_sink); + } +} + +static void +gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data) +{ + GstRtpSession *rtpsession = GST_RTP_SESSION (user_data); + + GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP"); + /* with an early RTCP request, we might have to start the RTCP thread */ + GST_RTP_SESSION_LOCK (rtpsession); + signal_waiting_rtcp_thread_unlocked (rtpsession); + GST_RTP_SESSION_UNLOCK (rtpsession); +}