rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpsession
22  * @title: rtpsession
23  * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
24  *
25  * The RTP session manager models participants with unique SSRC in an RTP
26  * session. This session can be used to send and receive RTP and RTCP packets.
27  * Based on what REQUEST pads are requested from the session manager, specific
28  * functionality can be activated.
29  *
30  * The session manager currently implements RFC 3550 including:
31  *
32  *   * RTP packet validation based on consecutive sequence numbers.
33  *
34  *   * Maintenance of the SSRC participant database.
35  *
36  *   * Keeping per participant statistics based on received RTCP packets.
37  *
38  *   * Scheduling of RR/SR RTCP packets.
39  *
40  *   * Support for multiple sender SSRC.
41  *
42  * The rtpsession will not demux packets based on SSRC or payload type, nor will
43  * it correct for packet reordering and jitter. Use #GstRtpSsrcDemux,
44  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
45  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
46  * combines all these features in one element.
47  *
48  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
49  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
50  * will be processed in the session and after being validated forwarded on the
51  * recv_rtp_src pad.
52  *
53  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
54  * which will automatically create a sync_src pad. Packets received on the RTCP
55  * pad will be used by the session manager to update the stats and database of
56  * the other participants. SR packets will be forwarded on the sync_src pad
57  * so that they can be used to perform inter-stream synchronisation when needed.
58  *
59  * If you want the session manager to generate and send RTCP packets, request
60  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
61  * that should be sent to all participants in the session.
62  *
63  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
64  * automatically create a send_rtp_src pad. The session manager will
65  * forward the packets on the send_rtp_src pad after updating its internal state.
66  *
67  * The session manager needs the clock-rate of the payload types it is handling
68  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
69  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
70  * signal.
71  *
72  * ## Example pipelines
73  * |[
74  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
75  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
76  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
77  * configured based on some negotiation process such as RTSP for this pipeline
78  * to work correctly.
79  * |[
80  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
81  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
82  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
83  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
84  * decoder and display. Receive RTCP packets from port 5001 and process them in
85  * the session manager.
86  * Note that the application/x-rtp caps on udpsrc should be
87  * configured based on some negotiation process such as RTSP for this pipeline
88  * to work correctly.
89  * |[
90  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
91  * ]| Send theora RTP packets through the session manager and out on UDP port
92  * 5000.
93  * |[
94  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
95  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
96  * ]| Send theora RTP packets through the session manager and out on UDP port
97  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
98  * correctly because the second udpsink will not preroll correctly (no RTCP
99  * packets are sent in the PAUSED state). Applications should manually set and
100  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
101  *
102  */
103
104 #ifdef HAVE_CONFIG_H
105 #include "config.h"
106 #endif
107
108 #include <gst/rtp/gstrtpbuffer.h>
109
110 #include <gst/glib-compat-private.h>
111
112 #include "gstrtpsession.h"
113 #include "rtpsession.h"
114
115 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
116 #define GST_CAT_DEFAULT gst_rtp_session_debug
117
118 #define GST_TYPE_RTP_NTP_TIME_SOURCE (gst_rtp_ntp_time_source_get_type ())
119 GType
120 gst_rtp_ntp_time_source_get_type (void)
121 {
122   static GType type = 0;
123   static const GEnumValue values[] = {
124     {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
125     {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
126     {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
127           "Running time based on pipeline clock",
128         "running-time"},
129     {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
130     {0, NULL, NULL},
131   };
132
133   if (!type) {
134     type = g_enum_register_static ("GstRtpNtpTimeSource", values);
135   }
136   return type;
137 }
138
139 /* sink pads */
140 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
141 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
142     GST_PAD_SINK,
143     GST_PAD_REQUEST,
144     GST_STATIC_CAPS ("application/x-rtp")
145     );
146
147 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
148 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
149     GST_PAD_SINK,
150     GST_PAD_REQUEST,
151     GST_STATIC_CAPS ("application/x-rtcp")
152     );
153
154 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
155 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
156     GST_PAD_SINK,
157     GST_PAD_REQUEST,
158     GST_STATIC_CAPS ("application/x-rtp")
159     );
160
161 /* src pads */
162 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
163 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
164     GST_PAD_SRC,
165     GST_PAD_SOMETIMES,
166     GST_STATIC_CAPS ("application/x-rtp")
167     );
168
169 static GstStaticPadTemplate rtpsession_sync_src_template =
170 GST_STATIC_PAD_TEMPLATE ("sync_src",
171     GST_PAD_SRC,
172     GST_PAD_SOMETIMES,
173     GST_STATIC_CAPS ("application/x-rtcp")
174     );
175
176 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
177 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
178     GST_PAD_SRC,
179     GST_PAD_SOMETIMES,
180     GST_STATIC_CAPS ("application/x-rtp")
181     );
182
183 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
184 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
185     GST_PAD_SRC,
186     GST_PAD_REQUEST,
187     GST_STATIC_CAPS ("application/x-rtcp")
188     );
189
190 /* signals and args */
191 enum
192 {
193   SIGNAL_REQUEST_PT_MAP,
194   SIGNAL_CLEAR_PT_MAP,
195
196   SIGNAL_ON_NEW_SSRC,
197   SIGNAL_ON_SSRC_COLLISION,
198   SIGNAL_ON_SSRC_VALIDATED,
199   SIGNAL_ON_SSRC_ACTIVE,
200   SIGNAL_ON_SSRC_SDES,
201   SIGNAL_ON_BYE_SSRC,
202   SIGNAL_ON_BYE_TIMEOUT,
203   SIGNAL_ON_TIMEOUT,
204   SIGNAL_ON_SENDER_TIMEOUT,
205   SIGNAL_ON_NEW_SENDER_SSRC,
206   SIGNAL_ON_SENDER_SSRC_ACTIVE,
207   LAST_SIGNAL
208 };
209
210 #define DEFAULT_BANDWIDTH            0
211 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
212 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
213 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
214 #define DEFAULT_SDES                 NULL
215 #define DEFAULT_NUM_SOURCES          0
216 #define DEFAULT_NUM_ACTIVE_SOURCES   0
217 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
218 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
219 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
220 #define DEFAULT_MAX_DROPOUT_TIME     60000
221 #define DEFAULT_MAX_MISORDER_TIME    2000
222 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
223 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
224 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
225
226 enum
227 {
228   PROP_0,
229   PROP_BANDWIDTH,
230   PROP_RTCP_FRACTION,
231   PROP_RTCP_RR_BANDWIDTH,
232   PROP_RTCP_RS_BANDWIDTH,
233   PROP_SDES,
234   PROP_NUM_SOURCES,
235   PROP_NUM_ACTIVE_SOURCES,
236   PROP_INTERNAL_SESSION,
237   PROP_USE_PIPELINE_CLOCK,
238   PROP_RTCP_MIN_INTERVAL,
239   PROP_PROBATION,
240   PROP_MAX_DROPOUT_TIME,
241   PROP_MAX_MISORDER_TIME,
242   PROP_STATS,
243   PROP_TWCC_STATS,
244   PROP_RTP_PROFILE,
245   PROP_NTP_TIME_SOURCE,
246   PROP_RTCP_SYNC_SEND_TIME
247 };
248
249 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->priv->lock)
250 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
251
252 #define GST_RTP_SESSION_WAIT(sess)   g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock)
253 #define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond)
254
255 struct _GstRtpSessionPrivate
256 {
257   GMutex lock;
258   GCond cond;
259   GstClock *sysclock;
260
261   RTPSession *session;
262
263   /* thread for sending out RTCP */
264   GstClockID id;
265   gboolean stop_thread;
266   GThread *thread;
267   gboolean thread_stopped;
268   gboolean wait_send;
269
270   /* caps mapping */
271   GHashTable *ptmap;
272
273   GstClockTime send_latency;
274
275   gboolean use_pipeline_clock;
276   GstRtpNtpTimeSource ntp_time_source;
277   gboolean rtcp_sync_send_time;
278
279   guint recv_rtx_req_count;
280   guint sent_rtx_req_count;
281
282   GstStructure *last_twcc_stats;
283
284   /*
285    * This is the list of processed packets in the receive path when upstream
286    * pushed a buffer list.
287    */
288   GstBufferList *processed_list;
289 };
290
291 /* callbacks to handle actions from the session manager */
292 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
293     RTPSource * src, GstBuffer * buffer, gpointer user_data);
294 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
295     RTPSource * src, gpointer data, gpointer user_data);
296 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
297     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
298 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
299     GstBuffer * buffer, gpointer user_data);
300 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
301     gpointer user_data);
302 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
303 static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
304     gboolean all_headers, gpointer user_data);
305 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
306     gpointer user_data);
307 static void gst_rtp_session_notify_nack (RTPSession * sess,
308     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
309 static void gst_rtp_session_notify_twcc (RTPSession * sess,
310     GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
311 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
312 static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
313     gpointer user_data);
314 static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
315     GstObject * parent, GstBuffer * buffer);
316 static GstFlowReturn gst_rtp_session_chain_recv_rtp_list (GstPad * pad,
317     GstObject * parent, GstBufferList * list);
318 static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
319     GstObject * parent, GstBuffer * buffer);
320 static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
321     GstObject * parent, GstBuffer * buffer);
322 static GstFlowReturn gst_rtp_session_chain_send_rtp_list (GstPad * pad,
323     GstObject * parent, GstBufferList * list);
324
325 static RTPSessionCallbacks callbacks = {
326   gst_rtp_session_process_rtp,
327   gst_rtp_session_send_rtp,
328   gst_rtp_session_sync_rtcp,
329   gst_rtp_session_send_rtcp,
330   gst_rtp_session_clock_rate,
331   gst_rtp_session_reconsider,
332   gst_rtp_session_request_key_unit,
333   gst_rtp_session_request_time,
334   gst_rtp_session_notify_nack,
335   gst_rtp_session_notify_twcc,
336   gst_rtp_session_reconfigure,
337   gst_rtp_session_notify_early_rtcp
338 };
339
340 /* GObject vmethods */
341 static void gst_rtp_session_finalize (GObject * object);
342 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
343     const GValue * value, GParamSpec * pspec);
344 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
345     GValue * value, GParamSpec * pspec);
346
347 /* GstElement vmethods */
348 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
349     GstStateChange transition);
350 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
351     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
352 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
353
354 static gboolean gst_rtp_session_sink_setcaps (GstPad * pad,
355     GstRtpSession * rtpsession, GstCaps * caps);
356 static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
357     GstRtpSession * rtpsession, GstCaps * caps);
358
359 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
360
361 static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
362
363 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
364
365 static void
366 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
367 {
368   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
369       src->ssrc);
370 }
371
372 static void
373 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
374 {
375   GstPad *send_rtp_sink;
376
377   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
378       src->ssrc);
379
380   GST_RTP_SESSION_LOCK (sess);
381   if ((send_rtp_sink = sess->send_rtp_sink))
382     gst_object_ref (send_rtp_sink);
383   GST_RTP_SESSION_UNLOCK (sess);
384
385   if (send_rtp_sink) {
386     GstStructure *structure;
387     GstEvent *event;
388     RTPSource *internal_src;
389     guint32 suggested_ssrc;
390
391     structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
392         (guint) src->ssrc, NULL);
393
394     /* if there is no source using the suggested ssrc, most probably because
395      * this ssrc has just collided, suggest upstream to use it */
396     suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
397     internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
398     if (!internal_src)
399       gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
400           (guint) suggested_ssrc, NULL);
401     else
402       g_object_unref (internal_src);
403
404     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
405     gst_pad_push_event (send_rtp_sink, event);
406     gst_object_unref (send_rtp_sink);
407   }
408 }
409
410 static void
411 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
412 {
413   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
414       src->ssrc);
415 }
416
417 static void
418 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
419 {
420   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
421       src->ssrc);
422 }
423
424 static void
425 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
426 {
427   GstStructure *s;
428   GstMessage *m;
429
430   /* convert the new SDES info into a message */
431   RTP_SESSION_LOCK (session);
432   g_object_get (src, "sdes", &s, NULL);
433   RTP_SESSION_UNLOCK (session);
434
435   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
436   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
437
438   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
439       src->ssrc);
440 }
441
442 static void
443 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
444 {
445   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
446       src->ssrc);
447 }
448
449 static void
450 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
451 {
452   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
453       src->ssrc);
454 }
455
456 static void
457 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
458 {
459   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
460       src->ssrc);
461 }
462
463 static void
464 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
465 {
466   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
467       src->ssrc);
468 }
469
470 static void
471 on_new_sender_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
472 {
473   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
474       src->ssrc);
475 }
476
477 static void
478 on_sender_ssrc_active (RTPSession * session, RTPSource * src,
479     GstRtpSession * sess)
480 {
481   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
482       src->ssrc);
483 }
484
485 static void
486 on_notify_stats (RTPSession * session, GParamSpec * spec,
487     GstRtpSession * rtpsession)
488 {
489   g_object_notify (G_OBJECT (rtpsession), "stats");
490 }
491
492 #define gst_rtp_session_parent_class parent_class
493 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
494
495 static void
496 gst_rtp_session_class_init (GstRtpSessionClass * klass)
497 {
498   GObjectClass *gobject_class;
499   GstElementClass *gstelement_class;
500
501   gobject_class = (GObjectClass *) klass;
502   gstelement_class = (GstElementClass *) klass;
503
504   gobject_class->finalize = gst_rtp_session_finalize;
505   gobject_class->set_property = gst_rtp_session_set_property;
506   gobject_class->get_property = gst_rtp_session_get_property;
507
508   /**
509    * GstRtpSession::request-pt-map:
510    * @sess: the object which received the signal
511    * @pt: the pt
512    *
513    * Request the payload type as #GstCaps for @pt.
514    */
515   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
516       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
517       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
518       NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
519   /**
520    * GstRtpSession::clear-pt-map:
521    * @sess: the object which received the signal
522    *
523    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
524    */
525   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
526       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
527       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528       G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
529       NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
530
531   /**
532    * GstRtpSession::on-new-ssrc:
533    * @sess: the object which received the signal
534    * @ssrc: the SSRC
535    *
536    * Notify of a new SSRC that entered @session.
537    */
538   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
539       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
540       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
541       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
542   /**
543    * GstRtpSession::on-ssrc_collision:
544    * @sess: the object which received the signal
545    * @ssrc: the SSRC
546    *
547    * Notify when we have an SSRC collision
548    */
549   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
550       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
551       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
552           on_ssrc_collision), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
553   /**
554    * GstRtpSession::on-ssrc_validated:
555    * @sess: the object which received the signal
556    * @ssrc: the SSRC
557    *
558    * Notify of a new SSRC that became validated.
559    */
560   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
561       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
562       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
563           on_ssrc_validated), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
564   /**
565    * GstRtpSession::on-ssrc-active:
566    * @sess: the object which received the signal
567    * @ssrc: the SSRC
568    *
569    * Notify of a SSRC that is active, i.e., sending RTCP.
570    */
571   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
572       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
573       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
574           on_ssrc_active), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
575   /**
576    * GstRtpSession::on-ssrc-sdes:
577    * @session: the object which received the signal
578    * @src: the SSRC
579    *
580    * Notify that a new SDES was received for SSRC.
581    */
582   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
583       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
584       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
585       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
586
587   /**
588    * GstRtpSession::on-bye-ssrc:
589    * @sess: the object which received the signal
590    * @ssrc: the SSRC
591    *
592    * Notify of an SSRC that became inactive because of a BYE packet.
593    */
594   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
595       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
596       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
597       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
598   /**
599    * GstRtpSession::on-bye-timeout:
600    * @sess: the object which received the signal
601    * @ssrc: the SSRC
602    *
603    * Notify of an SSRC that has timed out because of BYE
604    */
605   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
606       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
607       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
608       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
609   /**
610    * GstRtpSession::on-timeout:
611    * @sess: the object which received the signal
612    * @ssrc: the SSRC
613    *
614    * Notify of an SSRC that has timed out
615    */
616   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
617       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
618       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
619       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
620   /**
621    * GstRtpSession::on-sender-timeout:
622    * @sess: the object which received the signal
623    * @ssrc: the SSRC
624    *
625    * Notify of a sender SSRC that has timed out and became a receiver
626    */
627   gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
628       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
629       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
630           on_sender_timeout), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
631
632   /**
633    * GstRtpSession::on-new-sender-ssrc:
634    * @sess: the object which received the signal
635    * @ssrc: the sender SSRC
636    *
637    * Notify of a new sender SSRC that entered @session.
638    *
639    * Since: 1.8
640    */
641   gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
642       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
643       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
644       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
645
646   /**
647    * GstRtpSession::on-sender-ssrc-active:
648    * @sess: the object which received the signal
649    * @ssrc: the sender SSRC
650    *
651    * Notify of a sender SSRC that is active, i.e., sending RTCP.
652    *
653    * Since: 1.8
654    */
655   gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
656       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
657       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
658           on_ssrc_active), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
659
660   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
661       g_param_spec_double ("bandwidth", "Bandwidth",
662           "The bandwidth of the session in bytes per second (0 for auto-discover)",
663           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
664           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
665
666   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
667       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
668           "The RTCP bandwidth of the session in bytes per second "
669           "(or as a real fraction of the RTP bandwidth if < 1.0)",
670           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
671           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
672
673   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
674       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
675           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
676           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
677           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
678
679   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
680       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
681           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
682           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
683           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
684
685   g_object_class_install_property (gobject_class, PROP_SDES,
686       g_param_spec_boxed ("sdes", "SDES",
687           "The SDES items of this session",
688 #ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
689           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
690           | GST_PARAM_DOC_SHOW_DEFAULT));
691 #else
692           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
693 #endif
694
695   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
696       g_param_spec_uint ("num-sources", "Num Sources",
697           "The number of sources in the session", 0, G_MAXUINT,
698           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
699
700   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
701       g_param_spec_uint ("num-active-sources", "Num Active Sources",
702           "The number of active sources in the session", 0, G_MAXUINT,
703           DEFAULT_NUM_ACTIVE_SOURCES,
704           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
705
706   g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
707       g_param_spec_object ("internal-session", "Internal Session",
708           "The internal RTPSession object", RTP_TYPE_SESSION,
709           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
710
711   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
712       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
713           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
714           "(DEPRECATED: Use ntp-time-source property)",
715           DEFAULT_USE_PIPELINE_CLOCK,
716           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
717
718   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
719       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
720           "Minimum interval between Regular RTCP packet (in ns)",
721           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
722           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
723
724   g_object_class_install_property (gobject_class, PROP_PROBATION,
725       g_param_spec_uint ("probation", "Number of probations",
726           "Consecutive packet sequence numbers to accept the source",
727           0, G_MAXUINT, DEFAULT_PROBATION,
728           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
729
730   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
731       g_param_spec_uint ("max-dropout-time", "Max dropout time",
732           "The maximum time (milliseconds) of missing packets tolerated.",
733           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
734           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
735
736   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
737       g_param_spec_uint ("max-misorder-time", "Max misorder time",
738           "The maximum time (milliseconds) of misordered packets tolerated.",
739           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
740           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
741
742   /**
743    * GstRtpSession:stats:
744    *
745    * Various session statistics. This property returns a #GstStructure
746    * with name `application/x-rtp-session-stats` with the following fields:
747    *
748    * * "recv-rtx-req-count"  G_TYPE_UINT   The number of retransmission events
749    *      received from downstream (in receiver mode) (Since 1.16)
750    * * "sent-rtx-req-count" G_TYPE_UINT   The number of retransmission events
751    *      sent downstream (in sender mode) (Since 1.16)
752    * * "rtx-count"          G_TYPE_UINT   DEPRECATED Since 1.16, same as
753    *      "recv-rtx-req-count".
754    * * "rtx-drop-count"     G_TYPE_UINT   The number of retransmission events
755    *      dropped (due to bandwidth constraints)
756    * * "sent-nack-count"    G_TYPE_UINT   Number of NACKs sent
757    * * "recv-nack-count"    G_TYPE_UINT   Number of NACKs received
758    * * "source-stats"       G_TYPE_BOXED  GValueArray of #RTPSource:stats for all
759    *      RTP sources (Since 1.8)
760    *
761    * Since: 1.4
762    */
763   g_object_class_install_property (gobject_class, PROP_STATS,
764       g_param_spec_boxed ("stats", "Statistics",
765           "Various statistics", GST_TYPE_STRUCTURE,
766           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
767
768   /**
769    * GstRtpSession:twcc-stats:
770    *
771    * Various statistics derived from TWCC. This property returns a GstStructure
772    * with name RTPTWCCStats with the following fields:
773    *
774    *  "bitrate-sent"     G_TYPE_UINT    The actual sent bitrate of TWCC packets
775    *  "bitrate-recv"     G_TYPE_UINT    The estimated bitrate for the receiver.
776    *  "packets-sent"     G_TYPE_UINT    Number of packets sent
777    *  "packets-recv"     G_TYPE_UINT    Number of packets reported recevied
778    *  "packet-loss-pct"  G_TYPE_DOUBLE  Packetloss percentage, based on
779    *      packets reported as lost from the recevier.
780    *  "avg-delta-of-delta", G_TYPE_INT64 In nanoseconds, a moving window
781    *      average of the difference in inter-packet spacing between
782    *      sender and receiver. A sudden increase in this number can indicate
783    *      network congestion.
784    *
785    * Since: 1.18
786    */
787   g_object_class_install_property (gobject_class, PROP_TWCC_STATS,
788       g_param_spec_boxed ("twcc-stats", "TWCC Statistics",
789           "Various statistics from TWCC", GST_TYPE_STRUCTURE,
790           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
791
792   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
793       g_param_spec_enum ("rtp-profile", "RTP Profile",
794           "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
795           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
796
797   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
798       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
799           "NTP time source for RTCP packets",
800           GST_TYPE_RTP_NTP_TIME_SOURCE, DEFAULT_NTP_TIME_SOURCE,
801           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
802
803   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
804       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
805           "Use send time or capture time for RTCP sync "
806           "(TRUE = send time, FALSE = capture time)",
807           DEFAULT_RTCP_SYNC_SEND_TIME,
808           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
809
810   gstelement_class->change_state =
811       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
812   gstelement_class->request_new_pad =
813       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
814   gstelement_class->release_pad =
815       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
816
817   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
818
819   /* sink pads */
820   gst_element_class_add_static_pad_template (gstelement_class,
821       &rtpsession_recv_rtp_sink_template);
822   gst_element_class_add_static_pad_template (gstelement_class,
823       &rtpsession_recv_rtcp_sink_template);
824   gst_element_class_add_static_pad_template (gstelement_class,
825       &rtpsession_send_rtp_sink_template);
826
827   /* src pads */
828   gst_element_class_add_static_pad_template (gstelement_class,
829       &rtpsession_recv_rtp_src_template);
830   gst_element_class_add_static_pad_template (gstelement_class,
831       &rtpsession_sync_src_template);
832   gst_element_class_add_static_pad_template (gstelement_class,
833       &rtpsession_send_rtp_src_template);
834   gst_element_class_add_static_pad_template (gstelement_class,
835       &rtpsession_send_rtcp_src_template);
836
837   gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
838       "Filter/Network/RTP",
839       "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
840
841   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
842       "rtpsession", 0, "RTP Session");
843
844   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
845   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp_list);
846   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
847   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
848   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
849
850 #ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
851   gst_type_mark_as_plugin_api (GST_TYPE_RTP_NTP_TIME_SOURCE, 0);
852   gst_type_mark_as_plugin_api (RTP_TYPE_SESSION, 0);
853   gst_type_mark_as_plugin_api (RTP_TYPE_SOURCE, 0);
854 #endif
855 }
856
857 static void
858 gst_rtp_session_init (GstRtpSession * rtpsession)
859 {
860   rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
861   g_mutex_init (&rtpsession->priv->lock);
862   g_cond_init (&rtpsession->priv->cond);
863   rtpsession->priv->sysclock = gst_system_clock_obtain ();
864   rtpsession->priv->session = rtp_session_new ();
865   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
866   rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
867
868   /* configure callbacks */
869   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
870   /* configure signals */
871   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
872       (GCallback) on_new_ssrc, rtpsession);
873   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
874       (GCallback) on_ssrc_collision, rtpsession);
875   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
876       (GCallback) on_ssrc_validated, rtpsession);
877   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
878       (GCallback) on_ssrc_active, rtpsession);
879   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
880       (GCallback) on_ssrc_sdes, rtpsession);
881   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
882       (GCallback) on_bye_ssrc, rtpsession);
883   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
884       (GCallback) on_bye_timeout, rtpsession);
885   g_signal_connect (rtpsession->priv->session, "on-timeout",
886       (GCallback) on_timeout, rtpsession);
887   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
888       (GCallback) on_sender_timeout, rtpsession);
889   g_signal_connect (rtpsession->priv->session, "on-new-sender-ssrc",
890       (GCallback) on_new_sender_ssrc, rtpsession);
891   g_signal_connect (rtpsession->priv->session, "on-sender-ssrc-active",
892       (GCallback) on_sender_ssrc_active, rtpsession);
893   g_signal_connect (rtpsession->priv->session, "notify::stats",
894       (GCallback) on_notify_stats, rtpsession);
895   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
896       (GDestroyNotify) gst_caps_unref);
897
898   rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
899
900   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
901   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
902
903   rtpsession->priv->thread_stopped = TRUE;
904
905   rtpsession->priv->recv_rtx_req_count = 0;
906   rtpsession->priv->sent_rtx_req_count = 0;
907
908   rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
909 }
910
911 static void
912 gst_rtp_session_finalize (GObject * object)
913 {
914   GstRtpSession *rtpsession;
915
916   rtpsession = GST_RTP_SESSION (object);
917
918   g_hash_table_destroy (rtpsession->priv->ptmap);
919   g_mutex_clear (&rtpsession->priv->lock);
920   g_cond_clear (&rtpsession->priv->cond);
921   g_object_unref (rtpsession->priv->sysclock);
922   g_object_unref (rtpsession->priv->session);
923   if (rtpsession->priv->last_twcc_stats)
924     gst_structure_free (rtpsession->priv->last_twcc_stats);
925
926   G_OBJECT_CLASS (parent_class)->finalize (object);
927 }
928
929 static void
930 gst_rtp_session_set_property (GObject * object, guint prop_id,
931     const GValue * value, GParamSpec * pspec)
932 {
933   GstRtpSession *rtpsession;
934   GstRtpSessionPrivate *priv;
935
936   rtpsession = GST_RTP_SESSION (object);
937   priv = rtpsession->priv;
938
939   switch (prop_id) {
940     case PROP_BANDWIDTH:
941       g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
942       break;
943     case PROP_RTCP_FRACTION:
944       g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
945       break;
946     case PROP_RTCP_RR_BANDWIDTH:
947       g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
948           value);
949       break;
950     case PROP_RTCP_RS_BANDWIDTH:
951       g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
952           value);
953       break;
954     case PROP_SDES:
955       rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
956       break;
957     case PROP_USE_PIPELINE_CLOCK:
958       priv->use_pipeline_clock = g_value_get_boolean (value);
959       break;
960     case PROP_RTCP_MIN_INTERVAL:
961       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
962           value);
963       break;
964     case PROP_PROBATION:
965       g_object_set_property (G_OBJECT (priv->session), "probation", value);
966       break;
967     case PROP_MAX_DROPOUT_TIME:
968       g_object_set_property (G_OBJECT (priv->session), "max-dropout-time",
969           value);
970       break;
971     case PROP_MAX_MISORDER_TIME:
972       g_object_set_property (G_OBJECT (priv->session), "max-misorder-time",
973           value);
974       break;
975     case PROP_RTP_PROFILE:
976       g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
977       break;
978     case PROP_NTP_TIME_SOURCE:
979       priv->ntp_time_source = g_value_get_enum (value);
980       break;
981     case PROP_RTCP_SYNC_SEND_TIME:
982       priv->rtcp_sync_send_time = g_value_get_boolean (value);
983       break;
984     default:
985       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
986       break;
987   }
988 }
989
990 static void
991 gst_rtp_session_get_property (GObject * object, guint prop_id,
992     GValue * value, GParamSpec * pspec)
993 {
994   GstRtpSession *rtpsession;
995   GstRtpSessionPrivate *priv;
996
997   rtpsession = GST_RTP_SESSION (object);
998   priv = rtpsession->priv;
999
1000   switch (prop_id) {
1001     case PROP_BANDWIDTH:
1002       g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
1003       break;
1004     case PROP_RTCP_FRACTION:
1005       g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
1006       break;
1007     case PROP_RTCP_RR_BANDWIDTH:
1008       g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
1009           value);
1010       break;
1011     case PROP_RTCP_RS_BANDWIDTH:
1012       g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
1013           value);
1014       break;
1015     case PROP_SDES:
1016       g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
1017       break;
1018     case PROP_NUM_SOURCES:
1019       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
1020       break;
1021     case PROP_NUM_ACTIVE_SOURCES:
1022       g_value_set_uint (value,
1023           rtp_session_get_num_active_sources (priv->session));
1024       break;
1025     case PROP_INTERNAL_SESSION:
1026       g_value_set_object (value, priv->session);
1027       break;
1028     case PROP_USE_PIPELINE_CLOCK:
1029       g_value_set_boolean (value, priv->use_pipeline_clock);
1030       break;
1031     case PROP_RTCP_MIN_INTERVAL:
1032       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
1033           value);
1034       break;
1035     case PROP_PROBATION:
1036       g_object_get_property (G_OBJECT (priv->session), "probation", value);
1037       break;
1038     case PROP_MAX_DROPOUT_TIME:
1039       g_object_get_property (G_OBJECT (priv->session), "max-dropout-time",
1040           value);
1041       break;
1042     case PROP_MAX_MISORDER_TIME:
1043       g_object_get_property (G_OBJECT (priv->session), "max-misorder-time",
1044           value);
1045       break;
1046     case PROP_STATS:
1047       g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
1048       break;
1049     case PROP_TWCC_STATS:
1050       GST_RTP_SESSION_LOCK (rtpsession);
1051       g_value_set_boxed (value, priv->last_twcc_stats);
1052       GST_RTP_SESSION_UNLOCK (rtpsession);
1053       break;
1054     case PROP_RTP_PROFILE:
1055       g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
1056       break;
1057     case PROP_NTP_TIME_SOURCE:
1058       g_value_set_enum (value, priv->ntp_time_source);
1059       break;
1060     case PROP_RTCP_SYNC_SEND_TIME:
1061       g_value_set_boolean (value, priv->rtcp_sync_send_time);
1062       break;
1063     default:
1064       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1065       break;
1066   }
1067 }
1068
1069 static GstStructure *
1070 gst_rtp_session_create_stats (GstRtpSession * rtpsession)
1071 {
1072   GstStructure *s;
1073
1074   g_object_get (rtpsession->priv->session, "stats", &s, NULL);
1075   gst_structure_set (s, "rtx-count", G_TYPE_UINT,
1076       rtpsession->priv->recv_rtx_req_count, "recv-rtx-req-count", G_TYPE_UINT,
1077       rtpsession->priv->recv_rtx_req_count, "sent-rtx-req-count", G_TYPE_UINT,
1078       rtpsession->priv->sent_rtx_req_count, NULL);
1079
1080   return s;
1081 }
1082
1083 static void
1084 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
1085     guint64 * ntpnstime)
1086 {
1087   guint64 ntpns = -1;
1088   GstClock *clock;
1089   GstClockTime base_time, rt, clock_time;
1090
1091   GST_OBJECT_LOCK (rtpsession);
1092   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
1093     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
1094     gst_object_ref (clock);
1095     GST_OBJECT_UNLOCK (rtpsession);
1096
1097     /* get current clock time and convert to running time */
1098     clock_time = gst_clock_get_time (clock);
1099     rt = clock_time - base_time;
1100
1101     if (rtpsession->priv->use_pipeline_clock) {
1102       ntpns = rt;
1103       /* add constant to convert from 1970 based time to 1900 based time */
1104       ntpns += (2208988800LL * GST_SECOND);
1105     } else {
1106       switch (rtpsession->priv->ntp_time_source) {
1107         case GST_RTP_NTP_TIME_SOURCE_NTP:
1108         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1109           /* get current NTP time */
1110           ntpns = g_get_real_time () * GST_USECOND;
1111
1112           /* add constant to convert from 1970 based time to 1900 based time */
1113           if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1114             ntpns += (2208988800LL * GST_SECOND);
1115           break;
1116         }
1117         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1118           ntpns = rt;
1119           break;
1120         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1121           ntpns = clock_time;
1122           break;
1123         default:
1124           ntpns = -1;
1125           g_assert_not_reached ();
1126           break;
1127       }
1128     }
1129
1130     gst_object_unref (clock);
1131   } else {
1132     GST_OBJECT_UNLOCK (rtpsession);
1133     rt = -1;
1134     ntpns = -1;
1135   }
1136   if (running_time)
1137     *running_time = rt;
1138   if (ntpnstime)
1139     *ntpnstime = ntpns;
1140 }
1141
1142 /* must be called with GST_RTP_SESSION_LOCK */
1143 static void
1144 signal_waiting_rtcp_thread_unlocked (GstRtpSession * rtpsession)
1145 {
1146   if (rtpsession->priv->wait_send) {
1147     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1148     rtpsession->priv->wait_send = FALSE;
1149     GST_RTP_SESSION_SIGNAL (rtpsession);
1150   }
1151 }
1152
1153 static void
1154 rtcp_thread (GstRtpSession * rtpsession)
1155 {
1156   GstClockID id;
1157   GstClockTime current_time;
1158   GstClockTime next_timeout;
1159   guint64 ntpnstime;
1160   GstClockTime running_time;
1161   RTPSession *session;
1162   GstClock *sysclock;
1163
1164   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
1165
1166   GST_RTP_SESSION_LOCK (rtpsession);
1167
1168   while (rtpsession->priv->wait_send) {
1169     GST_LOG_OBJECT (rtpsession, "waiting for getting started");
1170     GST_RTP_SESSION_WAIT (rtpsession);
1171     GST_LOG_OBJECT (rtpsession, "signaled...");
1172   }
1173
1174   sysclock = rtpsession->priv->sysclock;
1175   current_time = gst_clock_get_time (sysclock);
1176
1177   session = rtpsession->priv->session;
1178
1179   GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
1180       GST_TIME_ARGS (current_time));
1181   session->start_time = current_time;
1182
1183   while (!rtpsession->priv->stop_thread) {
1184     GstClockReturn res;
1185
1186     /* get initial estimate */
1187     next_timeout = rtp_session_next_timeout (session, current_time);
1188
1189     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
1190         GST_TIME_ARGS (next_timeout));
1191
1192     /* leave if no more timeouts, the session ended */
1193     if (next_timeout == GST_CLOCK_TIME_NONE)
1194       break;
1195
1196     id = rtpsession->priv->id =
1197         gst_clock_new_single_shot_id (sysclock, next_timeout);
1198     GST_RTP_SESSION_UNLOCK (rtpsession);
1199
1200     res = gst_clock_id_wait (id, NULL);
1201
1202     GST_RTP_SESSION_LOCK (rtpsession);
1203     gst_clock_id_unref (id);
1204     rtpsession->priv->id = NULL;
1205
1206     if (rtpsession->priv->stop_thread)
1207       break;
1208
1209     /* update current time */
1210     current_time = gst_clock_get_time (sysclock);
1211
1212     /* get current NTP time */
1213     get_current_times (rtpsession, &running_time, &ntpnstime);
1214
1215     /* we get unlocked because we need to perform reconsideration, don't perform
1216      * the timeout but get a new reporting estimate. */
1217     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
1218         res, GST_TIME_ARGS (current_time));
1219
1220     /* perform actions, we ignore result. Release lock because it might push. */
1221     GST_RTP_SESSION_UNLOCK (rtpsession);
1222     rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
1223     GST_RTP_SESSION_LOCK (rtpsession);
1224   }
1225   /* mark the thread as stopped now */
1226   rtpsession->priv->thread_stopped = TRUE;
1227   GST_RTP_SESSION_UNLOCK (rtpsession);
1228
1229   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
1230 }
1231
1232 static gboolean
1233 start_rtcp_thread (GstRtpSession * rtpsession)
1234 {
1235   GError *error = NULL;
1236   gboolean res;
1237
1238   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
1239
1240   GST_RTP_SESSION_LOCK (rtpsession);
1241   rtpsession->priv->stop_thread = FALSE;
1242   if (rtpsession->priv->thread_stopped) {
1243     /* if the thread stopped, and we still have a handle to the thread, join it
1244      * now. We can safely join with the lock held, the thread will not take it
1245      * anymore. */
1246     if (rtpsession->priv->thread)
1247       g_thread_join (rtpsession->priv->thread);
1248     /* only create a new thread if the old one was stopped. Otherwise we can
1249      * just reuse the currently running one. */
1250     rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp",
1251         (GThreadFunc) rtcp_thread, rtpsession, &error);
1252     rtpsession->priv->thread_stopped = FALSE;
1253   }
1254   GST_RTP_SESSION_UNLOCK (rtpsession);
1255
1256   if (error != NULL) {
1257     res = FALSE;
1258     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
1259     g_error_free (error);
1260   } else {
1261     res = TRUE;
1262   }
1263   return res;
1264 }
1265
1266 static void
1267 stop_rtcp_thread (GstRtpSession * rtpsession)
1268 {
1269   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
1270
1271   GST_RTP_SESSION_LOCK (rtpsession);
1272   rtpsession->priv->stop_thread = TRUE;
1273   signal_waiting_rtcp_thread_unlocked (rtpsession);
1274   if (rtpsession->priv->id)
1275     gst_clock_id_unschedule (rtpsession->priv->id);
1276   GST_RTP_SESSION_UNLOCK (rtpsession);
1277 }
1278
1279 static void
1280 join_rtcp_thread (GstRtpSession * rtpsession)
1281 {
1282   GST_RTP_SESSION_LOCK (rtpsession);
1283   /* don't try to join when we have no thread */
1284   if (rtpsession->priv->thread != NULL) {
1285     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
1286     GST_RTP_SESSION_UNLOCK (rtpsession);
1287
1288     g_thread_join (rtpsession->priv->thread);
1289
1290     GST_RTP_SESSION_LOCK (rtpsession);
1291     /* after the join, take the lock and clear the thread structure. The caller
1292      * is supposed to not concurrently call start and join. */
1293     rtpsession->priv->thread = NULL;
1294   }
1295   GST_RTP_SESSION_UNLOCK (rtpsession);
1296 }
1297
1298 static GstStateChangeReturn
1299 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
1300 {
1301   GstStateChangeReturn res;
1302   GstRtpSession *rtpsession;
1303
1304   rtpsession = GST_RTP_SESSION (element);
1305
1306   switch (transition) {
1307     case GST_STATE_CHANGE_NULL_TO_READY:
1308       break;
1309     case GST_STATE_CHANGE_READY_TO_PAUSED:
1310       GST_RTP_SESSION_LOCK (rtpsession);
1311       rtpsession->priv->wait_send = TRUE;
1312       GST_RTP_SESSION_UNLOCK (rtpsession);
1313       break;
1314     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1315       break;
1316     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1317     case GST_STATE_CHANGE_PAUSED_TO_READY:
1318       /* no need to join yet, we might want to continue later. Also, the
1319        * dataflow could block downstream so that a join could just block
1320        * forever. */
1321       stop_rtcp_thread (rtpsession);
1322       break;
1323     default:
1324       break;
1325   }
1326
1327   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1328
1329   switch (transition) {
1330     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1331       if (!start_rtcp_thread (rtpsession))
1332         goto failed_thread;
1333       break;
1334     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1335       break;
1336     case GST_STATE_CHANGE_PAUSED_TO_READY:
1337       /* downstream is now releasing the dataflow and we can join. */
1338       join_rtcp_thread (rtpsession);
1339       rtp_session_reset (rtpsession->priv->session);
1340       break;
1341     case GST_STATE_CHANGE_READY_TO_NULL:
1342       break;
1343     default:
1344       break;
1345   }
1346   return res;
1347
1348   /* ERRORS */
1349 failed_thread:
1350   {
1351     return GST_STATE_CHANGE_FAILURE;
1352   }
1353 }
1354
1355 static gboolean
1356 return_true (gpointer key, gpointer value, gpointer user_data)
1357 {
1358   return TRUE;
1359 }
1360
1361 static void
1362 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1363 {
1364   GST_RTP_SESSION_LOCK (rtpsession);
1365   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1366   GST_RTP_SESSION_UNLOCK (rtpsession);
1367 }
1368
1369 /* called when the session manager has an RTP packet ready to be pushed */
1370 static GstFlowReturn
1371 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1372     GstBuffer * buffer, gpointer user_data)
1373 {
1374   GstFlowReturn result;
1375   GstRtpSession *rtpsession;
1376   GstPad *rtp_src;
1377
1378   rtpsession = GST_RTP_SESSION (user_data);
1379
1380   GST_RTP_SESSION_LOCK (rtpsession);
1381   if ((rtp_src = rtpsession->recv_rtp_src))
1382     gst_object_ref (rtp_src);
1383   GST_RTP_SESSION_UNLOCK (rtpsession);
1384
1385   if (rtp_src) {
1386     if (rtpsession->priv->processed_list) {
1387       GST_LOG_OBJECT (rtpsession, "queueing received RTP packet");
1388       gst_buffer_list_add (rtpsession->priv->processed_list, buffer);
1389       result = GST_FLOW_OK;
1390     } else {
1391       GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1392       result = gst_pad_push (rtp_src, buffer);
1393     }
1394     gst_object_unref (rtp_src);
1395   } else {
1396     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1397     gst_buffer_unref (buffer);
1398     result = GST_FLOW_OK;
1399   }
1400   return result;
1401 }
1402
1403 /* called when the session manager has an RTP packet ready for further
1404  * sending */
1405 static GstFlowReturn
1406 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1407     gpointer data, gpointer user_data)
1408 {
1409   GstFlowReturn result;
1410   GstRtpSession *rtpsession;
1411   GstPad *rtp_src;
1412
1413   rtpsession = GST_RTP_SESSION (user_data);
1414
1415   GST_RTP_SESSION_LOCK (rtpsession);
1416   if ((rtp_src = rtpsession->send_rtp_src))
1417     gst_object_ref (rtp_src);
1418   signal_waiting_rtcp_thread_unlocked (rtpsession);
1419   GST_RTP_SESSION_UNLOCK (rtpsession);
1420
1421   if (rtp_src) {
1422     if (GST_IS_BUFFER (data)) {
1423       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1424       result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1425     } else {
1426       GST_LOG_OBJECT (rtpsession, "sending RTP list");
1427       result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1428     }
1429     gst_object_unref (rtp_src);
1430   } else {
1431     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1432     result = GST_FLOW_OK;
1433   }
1434   return result;
1435 }
1436
1437 static void
1438 do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
1439 {
1440   GstCaps *caps;
1441   GstSegment seg;
1442   GstEvent *event;
1443   gchar *stream_id;
1444   gboolean have_group_id;
1445   guint group_id;
1446
1447   stream_id =
1448       g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
1449       g_random_int (), g_random_int ());
1450
1451   GST_RTP_SESSION_LOCK (rtpsession);
1452   if (rtpsession->recv_rtp_sink) {
1453     event =
1454         gst_pad_get_sticky_event (rtpsession->recv_rtp_sink,
1455         GST_EVENT_STREAM_START, 0);
1456     if (event) {
1457       if (gst_event_parse_group_id (event, &group_id))
1458         have_group_id = TRUE;
1459       else
1460         have_group_id = FALSE;
1461       gst_event_unref (event);
1462     } else {
1463       have_group_id = TRUE;
1464       group_id = gst_util_group_id_next ();
1465     }
1466   } else {
1467     have_group_id = TRUE;
1468     group_id = gst_util_group_id_next ();
1469   }
1470   GST_RTP_SESSION_UNLOCK (rtpsession);
1471
1472   event = gst_event_new_stream_start (stream_id);
1473   rtpsession->recv_rtcp_segment_seqnum = gst_event_get_seqnum (event);
1474   gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1475   if (have_group_id)
1476     gst_event_set_group_id (event, group_id);
1477   gst_pad_push_event (srcpad, event);
1478   g_free (stream_id);
1479
1480   caps = gst_caps_new_empty_simple ("application/x-rtcp");
1481   gst_pad_set_caps (srcpad, caps);
1482   gst_caps_unref (caps);
1483
1484   gst_segment_init (&seg, GST_FORMAT_TIME);
1485   event = gst_event_new_segment (&seg);
1486   gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1487   gst_pad_push_event (srcpad, event);
1488 }
1489
1490 /* called when the session manager has an RTCP packet ready for further
1491  * sending. The eos flag is set when an EOS event should be sent downstream as
1492  * well. */
1493 static GstFlowReturn
1494 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1495     GstBuffer * buffer, gboolean all_sources_bye, gpointer user_data)
1496 {
1497   GstFlowReturn result;
1498   GstRtpSession *rtpsession;
1499   GstPad *rtcp_src;
1500
1501   rtpsession = GST_RTP_SESSION (user_data);
1502
1503   GST_RTP_SESSION_LOCK (rtpsession);
1504   if (rtpsession->priv->stop_thread)
1505     goto stopping;
1506
1507   if ((rtcp_src = rtpsession->send_rtcp_src)) {
1508     gst_object_ref (rtcp_src);
1509     GST_RTP_SESSION_UNLOCK (rtpsession);
1510
1511     /* set rtcp caps on output pad */
1512     if (!gst_pad_has_current_caps (rtcp_src))
1513       do_rtcp_events (rtpsession, rtcp_src);
1514
1515     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1516     result = gst_pad_push (rtcp_src, buffer);
1517
1518     /* Forward send an EOS on the RTCP sink if we received an EOS on the
1519      * send_rtp_sink. We don't need to check the recv_rtp_sink since in this
1520      * case the EOS event would already have been sent */
1521     if (all_sources_bye && rtpsession->send_rtp_sink &&
1522         GST_PAD_IS_EOS (rtpsession->send_rtp_sink)) {
1523       GstEvent *event;
1524
1525       GST_LOG_OBJECT (rtpsession, "sending EOS");
1526
1527       event = gst_event_new_eos ();
1528       gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1529       gst_pad_push_event (rtcp_src, event);
1530     }
1531     gst_object_unref (rtcp_src);
1532   } else {
1533     GST_RTP_SESSION_UNLOCK (rtpsession);
1534
1535     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1536     gst_buffer_unref (buffer);
1537     result = GST_FLOW_OK;
1538   }
1539   return result;
1540
1541   /* ERRORS */
1542 stopping:
1543   {
1544     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1545     gst_buffer_unref (buffer);
1546     GST_RTP_SESSION_UNLOCK (rtpsession);
1547     return GST_FLOW_OK;
1548   }
1549 }
1550
1551 /* called when the session manager has an SR RTCP packet ready for handling
1552  * inter stream synchronisation */
1553 static GstFlowReturn
1554 gst_rtp_session_sync_rtcp (RTPSession * sess,
1555     GstBuffer * buffer, gpointer user_data)
1556 {
1557   GstFlowReturn result;
1558   GstRtpSession *rtpsession;
1559   GstPad *sync_src;
1560
1561   rtpsession = GST_RTP_SESSION (user_data);
1562
1563   GST_RTP_SESSION_LOCK (rtpsession);
1564   if (rtpsession->priv->stop_thread)
1565     goto stopping;
1566
1567   if ((sync_src = rtpsession->sync_src)) {
1568     gst_object_ref (sync_src);
1569     GST_RTP_SESSION_UNLOCK (rtpsession);
1570
1571     /* set rtcp caps on output pad, this happens
1572      * when we receive RTCP muxed with RTP according
1573      * to RFC5761. Otherwise we would have forwarded
1574      * the events from the recv_rtcp_sink pad already
1575      */
1576     if (!gst_pad_has_current_caps (sync_src))
1577       do_rtcp_events (rtpsession, sync_src);
1578
1579     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1580     result = gst_pad_push (sync_src, buffer);
1581     gst_object_unref (sync_src);
1582   } else {
1583     GST_RTP_SESSION_UNLOCK (rtpsession);
1584
1585     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1586     gst_buffer_unref (buffer);
1587     result = GST_FLOW_OK;
1588   }
1589   return result;
1590
1591   /* ERRORS */
1592 stopping:
1593   {
1594     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1595     gst_buffer_unref (buffer);
1596     GST_RTP_SESSION_UNLOCK (rtpsession);
1597     return GST_FLOW_OK;
1598   }
1599 }
1600
1601 static void
1602 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1603 {
1604   GstRtpSessionPrivate *priv;
1605   const GstStructure *s;
1606   gint payload;
1607
1608   priv = rtpsession->priv;
1609
1610   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1611
1612   s = gst_caps_get_structure (caps, 0);
1613
1614   if (!gst_structure_get_int (s, "payload", &payload))
1615     return;
1616
1617   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1618     return;
1619
1620   rtp_session_update_recv_caps_structure (rtpsession->priv->session, s);
1621
1622   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1623       gst_caps_ref (caps));
1624 }
1625
1626 static GstCaps *
1627 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1628 {
1629   GstCaps *caps = NULL;
1630   GValue args[2] = { {0}, {0} };
1631   GValue ret = { 0 };
1632
1633   GST_RTP_SESSION_LOCK (rtpsession);
1634   caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1635       GINT_TO_POINTER (payload));
1636   if (caps) {
1637     gst_caps_ref (caps);
1638     goto done;
1639   }
1640
1641   /* not found in the cache, try to get it with a signal */
1642   g_value_init (&args[0], GST_TYPE_ELEMENT);
1643   g_value_set_object (&args[0], rtpsession);
1644   g_value_init (&args[1], G_TYPE_UINT);
1645   g_value_set_uint (&args[1], payload);
1646
1647   g_value_init (&ret, GST_TYPE_CAPS);
1648   g_value_set_boxed (&ret, NULL);
1649
1650   GST_RTP_SESSION_UNLOCK (rtpsession);
1651
1652   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1653       &ret);
1654
1655   GST_RTP_SESSION_LOCK (rtpsession);
1656
1657   g_value_unset (&args[0]);
1658   g_value_unset (&args[1]);
1659   caps = (GstCaps *) g_value_dup_boxed (&ret);
1660   g_value_unset (&ret);
1661   if (!caps)
1662     goto no_caps;
1663
1664   gst_rtp_session_cache_caps (rtpsession, caps);
1665
1666 done:
1667   GST_RTP_SESSION_UNLOCK (rtpsession);
1668
1669   return caps;
1670
1671 no_caps:
1672   {
1673     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1674     goto done;
1675   }
1676 }
1677
1678 /* called when the session manager needs the clock rate */
1679 static gint
1680 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1681     gpointer user_data)
1682 {
1683   gint result = -1;
1684   GstRtpSession *rtpsession;
1685   GstCaps *caps;
1686   const GstStructure *s;
1687
1688   rtpsession = GST_RTP_SESSION_CAST (user_data);
1689
1690   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1691
1692   if (!caps)
1693     goto done;
1694
1695   s = gst_caps_get_structure (caps, 0);
1696   if (!gst_structure_get_int (s, "clock-rate", &result))
1697     goto no_clock_rate;
1698
1699   gst_caps_unref (caps);
1700
1701   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1702
1703 done:
1704
1705   return result;
1706
1707   /* ERRORS */
1708 no_clock_rate:
1709   {
1710     gst_caps_unref (caps);
1711     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1712     goto done;
1713   }
1714 }
1715
1716 /* called when the session manager asks us to reconsider the timeout */
1717 static void
1718 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1719 {
1720   GstRtpSession *rtpsession;
1721
1722   rtpsession = GST_RTP_SESSION_CAST (user_data);
1723
1724   GST_RTP_SESSION_LOCK (rtpsession);
1725   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1726   if (rtpsession->priv->id)
1727     gst_clock_id_unschedule (rtpsession->priv->id);
1728   GST_RTP_SESSION_UNLOCK (rtpsession);
1729 }
1730
1731 static gboolean
1732 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
1733     GstEvent * event)
1734 {
1735   GstRtpSession *rtpsession;
1736   gboolean ret = FALSE;
1737
1738   rtpsession = GST_RTP_SESSION (parent);
1739
1740   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1741       GST_EVENT_TYPE_NAME (event));
1742
1743   switch (GST_EVENT_TYPE (event)) {
1744     case GST_EVENT_CAPS:
1745     {
1746       GstCaps *caps;
1747
1748       /* process */
1749       gst_event_parse_caps (event, &caps);
1750       gst_rtp_session_sink_setcaps (pad, rtpsession, caps);
1751       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1752       break;
1753     }
1754     case GST_EVENT_FLUSH_STOP:
1755       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1756       rtpsession->recv_rtcp_segment_seqnum = GST_SEQNUM_INVALID;
1757       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1758       break;
1759     case GST_EVENT_SEGMENT:
1760     {
1761       GstSegment *segment, in_segment;
1762
1763       segment = &rtpsession->recv_rtp_seg;
1764
1765       /* the newsegment event is needed to convert the RTP timestamp to
1766        * running_time, which is needed to generate a mapping from RTP to NTP
1767        * timestamps in SR reports */
1768       gst_event_copy_segment (event, &in_segment);
1769       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
1770           &in_segment);
1771
1772       /* accept upstream */
1773       gst_segment_copy_into (&in_segment, segment);
1774
1775       /* push event forward */
1776       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1777       break;
1778     }
1779     case GST_EVENT_EOS:
1780     {
1781       GstPad *rtcp_src;
1782
1783       ret =
1784           gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
1785
1786       GST_RTP_SESSION_LOCK (rtpsession);
1787       if ((rtcp_src = rtpsession->send_rtcp_src))
1788         gst_object_ref (rtcp_src);
1789       GST_RTP_SESSION_UNLOCK (rtpsession);
1790
1791       gst_event_unref (event);
1792
1793       if (rtcp_src) {
1794         event = gst_event_new_eos ();
1795         if (rtpsession->recv_rtcp_segment_seqnum != GST_SEQNUM_INVALID)
1796           gst_event_set_seqnum (event, rtpsession->recv_rtcp_segment_seqnum);
1797         ret = gst_pad_push_event (rtcp_src, event);
1798         gst_object_unref (rtcp_src);
1799       } else {
1800         ret = TRUE;
1801       }
1802       break;
1803     }
1804     default:
1805       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1806       break;
1807   }
1808
1809   return ret;
1810
1811 }
1812
1813 static gboolean
1814 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1815     guint32 ssrc, guint payload, gboolean all_headers, gint count)
1816 {
1817   GstCaps *caps;
1818
1819   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1820
1821   if (caps) {
1822     const GstStructure *s = gst_caps_get_structure (caps, 0);
1823     gboolean pli;
1824     gboolean fir;
1825
1826     pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1827     fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1828
1829     /* Google Talk uses FIR for repair, so send it even if we just want a
1830      * regular PLI */
1831     if (!pli &&
1832         gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1833       fir = TRUE;
1834
1835     gst_caps_unref (caps);
1836
1837     if (pli || fir)
1838       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1839           fir, count);
1840   }
1841
1842   return FALSE;
1843 }
1844
1845 static gboolean
1846 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
1847     GstEvent * event)
1848 {
1849   GstRtpSession *rtpsession;
1850   gboolean forward = TRUE;
1851   gboolean ret = TRUE;
1852   const GstStructure *s;
1853   guint32 ssrc;
1854   guint pt;
1855
1856   rtpsession = GST_RTP_SESSION (parent);
1857
1858   switch (GST_EVENT_TYPE (event)) {
1859     case GST_EVENT_CUSTOM_UPSTREAM:
1860       s = gst_event_get_structure (event);
1861       if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1862           gst_structure_get_uint (s, "ssrc", &ssrc) &&
1863           gst_structure_get_uint (s, "payload", &pt)) {
1864         gboolean all_headers = FALSE;
1865         gint count = -1;
1866
1867         gst_structure_get_boolean (s, "all-headers", &all_headers);
1868         if (gst_structure_get_int (s, "count", &count) && count < 0)
1869           count += G_MAXINT;    /* Make sure count is positive if present */
1870         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1871                 all_headers, count))
1872           forward = FALSE;
1873       } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
1874         guint seqnum, delay, deadline, max_delay, avg_rtt;
1875
1876         GST_RTP_SESSION_LOCK (rtpsession);
1877         rtpsession->priv->recv_rtx_req_count++;
1878         GST_RTP_SESSION_UNLOCK (rtpsession);
1879
1880         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
1881           ssrc = -1;
1882         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
1883           seqnum = -1;
1884         if (!gst_structure_get_uint (s, "delay", &delay))
1885           delay = 0;
1886         if (!gst_structure_get_uint (s, "deadline", &deadline))
1887           deadline = 100;
1888         if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
1889           avg_rtt = 40;
1890
1891         /* remaining time to receive the packet */
1892         max_delay = deadline;
1893         if (max_delay > delay)
1894           max_delay -= delay;
1895         /* estimated RTT */
1896         if (max_delay > avg_rtt)
1897           max_delay -= avg_rtt;
1898         else
1899           max_delay = 0;
1900
1901         if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
1902                 max_delay * GST_MSECOND))
1903           forward = FALSE;
1904       }
1905       break;
1906     default:
1907       break;
1908   }
1909
1910   if (forward) {
1911     GstPad *recv_rtp_sink;
1912
1913     GST_RTP_SESSION_LOCK (rtpsession);
1914     if ((recv_rtp_sink = rtpsession->recv_rtp_sink))
1915       gst_object_ref (recv_rtp_sink);
1916     GST_RTP_SESSION_UNLOCK (rtpsession);
1917
1918     if (recv_rtp_sink) {
1919       ret = gst_pad_push_event (recv_rtp_sink, event);
1920       gst_object_unref (recv_rtp_sink);
1921     } else
1922       gst_event_unref (event);
1923   } else {
1924     gst_event_unref (event);
1925   }
1926
1927   return ret;
1928 }
1929
1930
1931 static GstIterator *
1932 gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
1933 {
1934   GstRtpSession *rtpsession;
1935   GstPad *otherpad = NULL;
1936   GstIterator *it = NULL;
1937
1938   rtpsession = GST_RTP_SESSION (parent);
1939
1940   GST_RTP_SESSION_LOCK (rtpsession);
1941   if (pad == rtpsession->recv_rtp_src) {
1942     otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1943   } else if (pad == rtpsession->recv_rtp_sink) {
1944     otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1945   } else if (pad == rtpsession->send_rtp_src) {
1946     otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1947   } else if (pad == rtpsession->send_rtp_sink) {
1948     otherpad = gst_object_ref (rtpsession->send_rtp_src);
1949   }
1950   GST_RTP_SESSION_UNLOCK (rtpsession);
1951
1952   if (otherpad) {
1953     GValue val = { 0, };
1954
1955     g_value_init (&val, GST_TYPE_PAD);
1956     g_value_set_object (&val, otherpad);
1957     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1958     g_value_unset (&val);
1959     gst_object_unref (otherpad);
1960   } else {
1961     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1962   }
1963
1964   return it;
1965 }
1966
1967 static gboolean
1968 gst_rtp_session_sink_setcaps (GstPad * pad, GstRtpSession * rtpsession,
1969     GstCaps * caps)
1970 {
1971   GST_RTP_SESSION_LOCK (rtpsession);
1972   gst_rtp_session_cache_caps (rtpsession, caps);
1973   GST_RTP_SESSION_UNLOCK (rtpsession);
1974
1975   return TRUE;
1976 }
1977
1978 /* receive a packet from a sender, send it to the RTP session manager and
1979  * forward the packet on the rtp_src pad
1980  */
1981 static GstFlowReturn
1982 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
1983     GstBuffer * buffer)
1984 {
1985   GstRtpSession *rtpsession;
1986   GstRtpSessionPrivate *priv;
1987   GstFlowReturn ret;
1988   GstClockTime current_time, running_time;
1989   GstClockTime timestamp;
1990   guint64 ntpnstime;
1991
1992   rtpsession = GST_RTP_SESSION (parent);
1993   priv = rtpsession->priv;
1994
1995   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1996
1997   GST_RTP_SESSION_LOCK (rtpsession);
1998   signal_waiting_rtcp_thread_unlocked (rtpsession);
1999   GST_RTP_SESSION_UNLOCK (rtpsession);
2000
2001   /* get NTP time when this packet was captured, this depends on the timestamp. */
2002   timestamp = GST_BUFFER_PTS (buffer);
2003   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2004     /* convert to running time using the segment values */
2005     running_time =
2006         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
2007         timestamp);
2008     ntpnstime = GST_CLOCK_TIME_NONE;
2009   } else {
2010     get_current_times (rtpsession, &running_time, &ntpnstime);
2011   }
2012   current_time = gst_clock_get_time (priv->sysclock);
2013
2014   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
2015       running_time, ntpnstime);
2016   if (ret != GST_FLOW_OK)
2017     goto push_error;
2018
2019 done:
2020
2021   return ret;
2022
2023   /* ERRORS */
2024 push_error:
2025   {
2026     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2027         gst_flow_get_name (ret));
2028     goto done;
2029   }
2030 }
2031
2032 static gboolean
2033 process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data)
2034 {
2035   gint ret;
2036
2037   ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer);
2038   if (ret != GST_FLOW_OK)
2039     GST_ERROR ("Processing individual buffer in a list failed");
2040
2041   /*
2042    * The buffer has been processed, remove it from the original list, if it was
2043    * a valid RTP buffer it has been added to the "processed" list in
2044    * gst_rtp_session_process_rtp().
2045    */
2046   *buffer = NULL;
2047   return TRUE;
2048 }
2049
2050 static GstFlowReturn
2051 gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent,
2052     GstBufferList * list)
2053 {
2054   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2055   GstBufferList *processed_list;
2056
2057   processed_list = gst_buffer_list_new ();
2058
2059   /* Set some private data to detect that a buffer list is being pushed. */
2060   rtpsession->priv->processed_list = processed_list;
2061
2062   /*
2063    * Individually process the buffers from the incoming buffer list as the
2064    * incoming RTP packets in the list can be mixed in all sorts of ways:
2065    *    - different frames,
2066    *    - different sources,
2067    *    - different types (RTP or RTCP)
2068    */
2069   gst_buffer_list_foreach (list,
2070       (GstBufferListFunc) process_received_buffer_in_list, parent);
2071
2072   gst_buffer_list_unref (list);
2073
2074   /* Clean up private data in case the next push does not use a buffer list. */
2075   rtpsession->priv->processed_list = NULL;
2076
2077   if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) {
2078     gst_buffer_list_unref (processed_list);
2079     return GST_FLOW_OK;
2080   }
2081
2082   GST_LOG_OBJECT (rtpsession, "pushing received RTP list");
2083   return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list);
2084 }
2085
2086 static gboolean
2087 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
2088     GstEvent * event)
2089 {
2090   GstRtpSession *rtpsession;
2091   gboolean ret = FALSE;
2092
2093   rtpsession = GST_RTP_SESSION (parent);
2094
2095   GST_DEBUG_OBJECT (rtpsession, "received event %s",
2096       GST_EVENT_TYPE_NAME (event));
2097
2098   switch (GST_EVENT_TYPE (event)) {
2099     case GST_EVENT_SEGMENT:
2100       /* Make sure that the sync_src pad has caps before the segment event.
2101        * Otherwise we might get a segment event before caps from the receive
2102        * RTCP pad, and then later when receiving RTCP packets will set caps.
2103        * This will results in a sticky event misordering warning
2104        */
2105       if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
2106         GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
2107         gst_pad_set_caps (rtpsession->sync_src, caps);
2108         gst_caps_unref (caps);
2109       }
2110       /* fall through */
2111     default:
2112       ret = gst_pad_push_event (rtpsession->sync_src, event);
2113       break;
2114   }
2115
2116   return ret;
2117 }
2118
2119 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
2120  * forward the SR packets to the sync_src pad.
2121  */
2122 static GstFlowReturn
2123 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
2124     GstBuffer * buffer)
2125 {
2126   GstRtpSession *rtpsession;
2127   GstRtpSessionPrivate *priv;
2128   GstClockTime current_time;
2129   GstClockTime running_time;
2130   guint64 ntpnstime;
2131
2132   rtpsession = GST_RTP_SESSION (parent);
2133   priv = rtpsession->priv;
2134
2135   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
2136
2137   GST_RTP_SESSION_LOCK (rtpsession);
2138   signal_waiting_rtcp_thread_unlocked (rtpsession);
2139   GST_RTP_SESSION_UNLOCK (rtpsession);
2140
2141   current_time = gst_clock_get_time (priv->sysclock);
2142   get_current_times (rtpsession, &running_time, &ntpnstime);
2143
2144   rtp_session_process_rtcp (priv->session, buffer, current_time, running_time,
2145       ntpnstime);
2146
2147   return GST_FLOW_OK;           /* always return OK */
2148 }
2149
2150 static gboolean
2151 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent,
2152     GstQuery * query)
2153 {
2154   GstRtpSession *rtpsession;
2155   gboolean ret = FALSE;
2156
2157   rtpsession = GST_RTP_SESSION (parent);
2158
2159   GST_DEBUG_OBJECT (rtpsession, "received QUERY %s",
2160       GST_QUERY_TYPE_NAME (query));
2161
2162   switch (GST_QUERY_TYPE (query)) {
2163     case GST_QUERY_LATENCY:
2164       ret = TRUE;
2165       /* use the defaults for the latency query. */
2166       gst_query_set_latency (query, FALSE, 0, -1);
2167       break;
2168     default:
2169       /* other queries simply fail for now */
2170       break;
2171   }
2172
2173   return ret;
2174 }
2175
2176 static gboolean
2177 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent,
2178     GstEvent * event)
2179 {
2180   GstRtpSession *rtpsession;
2181   gboolean ret = TRUE;
2182
2183   rtpsession = GST_RTP_SESSION (parent);
2184   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2185       GST_EVENT_TYPE_NAME (event));
2186
2187   switch (GST_EVENT_TYPE (event)) {
2188     case GST_EVENT_SEEK:
2189     case GST_EVENT_LATENCY:
2190       gst_event_unref (event);
2191       ret = TRUE;
2192       break;
2193     default:
2194       /* other events simply fail for now */
2195       gst_event_unref (event);
2196       ret = FALSE;
2197       break;
2198   }
2199
2200   return ret;
2201 }
2202
2203
2204 static gboolean
2205 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
2206     GstEvent * event)
2207 {
2208   GstRtpSession *rtpsession;
2209   gboolean ret = FALSE;
2210
2211   rtpsession = GST_RTP_SESSION (parent);
2212
2213   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2214       GST_EVENT_TYPE_NAME (event));
2215
2216   switch (GST_EVENT_TYPE (event)) {
2217     case GST_EVENT_CAPS:
2218     {
2219       GstCaps *caps;
2220
2221       /* process */
2222       gst_event_parse_caps (event, &caps);
2223       gst_rtp_session_setcaps_send_rtp (pad, rtpsession, caps);
2224       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2225       break;
2226     }
2227     case GST_EVENT_FLUSH_STOP:
2228       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
2229       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2230       break;
2231     case GST_EVENT_SEGMENT:{
2232       GstSegment *segment, in_segment;
2233
2234       segment = &rtpsession->send_rtp_seg;
2235
2236       /* the newsegment event is needed to convert the RTP timestamp to
2237        * running_time, which is needed to generate a mapping from RTP to NTP
2238        * timestamps in SR reports */
2239       gst_event_copy_segment (event, &in_segment);
2240       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
2241           &in_segment);
2242
2243       /* accept upstream */
2244       gst_segment_copy_into (&in_segment, segment);
2245
2246       /* push event forward */
2247       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2248       break;
2249     }
2250     case GST_EVENT_EOS:{
2251       GstClockTime current_time;
2252
2253       /* push downstream FIXME, we are not supposed to leave the session just
2254        * because we stop sending. */
2255       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2256       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
2257
2258       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
2259       rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
2260       rtp_session_schedule_bye (rtpsession->priv->session, current_time);
2261       break;
2262     }
2263     default:{
2264       GstPad *send_rtp_src;
2265
2266       GST_RTP_SESSION_LOCK (rtpsession);
2267       if ((send_rtp_src = rtpsession->send_rtp_src))
2268         gst_object_ref (send_rtp_src);
2269       GST_RTP_SESSION_UNLOCK (rtpsession);
2270
2271       if (send_rtp_src) {
2272         ret = gst_pad_push_event (send_rtp_src, event);
2273         gst_object_unref (send_rtp_src);
2274       } else
2275         gst_event_unref (event);
2276
2277       break;
2278     }
2279   }
2280
2281   return ret;
2282 }
2283
2284 static gboolean
2285 gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent,
2286     GstEvent * event)
2287 {
2288   GstRtpSession *rtpsession;
2289   gboolean ret = FALSE;
2290
2291   rtpsession = GST_RTP_SESSION (parent);
2292
2293   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2294       GST_EVENT_TYPE_NAME (event));
2295
2296   switch (GST_EVENT_TYPE (event)) {
2297     case GST_EVENT_LATENCY:
2298       /* save the latency, we need this to know when an RTP packet will be
2299        * rendered by the sink */
2300       gst_event_parse_latency (event, &rtpsession->priv->send_latency);
2301
2302       ret = gst_pad_event_default (pad, parent, event);
2303       break;
2304     default:
2305       ret = gst_pad_event_default (pad, parent, event);
2306       break;
2307   }
2308   return ret;
2309 }
2310
2311 static GstCaps *
2312 gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2313     GstCaps * filter)
2314 {
2315   GstRtpSessionPrivate *priv;
2316   GstCaps *result;
2317   GstStructure *s1, *s2;
2318   guint ssrc;
2319   gboolean is_random;
2320
2321   priv = rtpsession->priv;
2322
2323   ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
2324
2325   /* we can basically accept anything but we prefer to receive packets with our
2326    * internal SSRC so that we don't have to patch it. Create a structure with
2327    * the SSRC and another one without.
2328    * Only do this if the session actually decided on an ssrc already,
2329    * otherwise we give upstream the opportunity to select an ssrc itself */
2330   if (!is_random) {
2331     s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
2332         NULL);
2333     s2 = gst_structure_new_empty ("application/x-rtp");
2334
2335     result = gst_caps_new_full (s1, s2, NULL);
2336   } else {
2337     result = gst_caps_new_empty_simple ("application/x-rtp");
2338   }
2339
2340   if (filter) {
2341     GstCaps *caps = result;
2342
2343     result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
2344     gst_caps_unref (caps);
2345   }
2346
2347   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
2348
2349   return result;
2350 }
2351
2352 static gboolean
2353 gst_rtp_session_query_send_rtp (GstPad * pad, GstObject * parent,
2354     GstQuery * query)
2355 {
2356   gboolean res = FALSE;
2357   GstRtpSession *rtpsession;
2358
2359   rtpsession = GST_RTP_SESSION (parent);
2360
2361   switch (GST_QUERY_TYPE (query)) {
2362     case GST_QUERY_CAPS:
2363     {
2364       GstCaps *filter, *caps;
2365
2366       gst_query_parse_caps (query, &filter);
2367       caps = gst_rtp_session_getcaps_send_rtp (pad, rtpsession, filter);
2368       gst_query_set_caps_result (query, caps);
2369       gst_caps_unref (caps);
2370       res = TRUE;
2371       break;
2372     }
2373     default:
2374       res = gst_pad_query_default (pad, parent, query);
2375       break;
2376   }
2377
2378   return res;
2379 }
2380
2381 static gboolean
2382 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2383     GstCaps * caps)
2384 {
2385   GstRtpSessionPrivate *priv;
2386
2387   priv = rtpsession->priv;
2388
2389   rtp_session_update_send_caps (priv->session, caps);
2390
2391   return TRUE;
2392 }
2393
2394 /* Receive an RTP packet or a list of packets to be sent to the receivers,
2395  * send to RTP session manager and forward to send_rtp_src.
2396  */
2397 static GstFlowReturn
2398 gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
2399     gpointer data, gboolean is_list)
2400 {
2401   GstRtpSessionPrivate *priv;
2402   GstFlowReturn ret;
2403   GstClockTime timestamp, running_time;
2404   GstClockTime current_time;
2405
2406   priv = rtpsession->priv;
2407
2408   GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
2409
2410   /* get NTP time when this packet was captured, this depends on the timestamp. */
2411   if (is_list) {
2412     GstBuffer *buffer = NULL;
2413
2414     /* All buffers in a list have the same timestamp.
2415      * So, just take it from the first buffer. */
2416     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
2417     if (buffer)
2418       timestamp = GST_BUFFER_PTS (buffer);
2419     else
2420       timestamp = -1;
2421   } else {
2422     timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
2423   }
2424
2425   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2426     /* convert to running time using the segment start value. */
2427     running_time =
2428         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
2429         timestamp);
2430     if (priv->rtcp_sync_send_time)
2431       running_time += priv->send_latency;
2432   } else {
2433     /* no timestamp. */
2434     running_time = -1;
2435   }
2436
2437   current_time = gst_clock_get_time (priv->sysclock);
2438   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
2439       running_time);
2440   if (ret != GST_FLOW_OK)
2441     goto push_error;
2442
2443 done:
2444
2445   return ret;
2446
2447   /* ERRORS */
2448 push_error:
2449   {
2450     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2451         gst_flow_get_name (ret));
2452     goto done;
2453   }
2454 }
2455
2456 static GstFlowReturn
2457 gst_rtp_session_chain_send_rtp (GstPad * pad, GstObject * parent,
2458     GstBuffer * buffer)
2459 {
2460   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2461
2462   return gst_rtp_session_chain_send_rtp_common (rtpsession, buffer, FALSE);
2463 }
2464
2465 static GstFlowReturn
2466 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstObject * parent,
2467     GstBufferList * list)
2468 {
2469   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2470
2471   return gst_rtp_session_chain_send_rtp_common (rtpsession, list, TRUE);
2472 }
2473
2474 /* Create sinkpad to receive RTP packets from senders. This will also create a
2475  * srcpad for the RTP packets.
2476  */
2477 static GstPad *
2478 create_recv_rtp_sink (GstRtpSession * rtpsession)
2479 {
2480   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
2481
2482   rtpsession->recv_rtp_sink =
2483       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
2484       "recv_rtp_sink");
2485   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
2486       gst_rtp_session_chain_recv_rtp);
2487   gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink,
2488       gst_rtp_session_chain_recv_rtp_list);
2489   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
2490       gst_rtp_session_event_recv_rtp_sink);
2491   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
2492       gst_rtp_session_iterate_internal_links);
2493   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
2494   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
2495   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2496       rtpsession->recv_rtp_sink);
2497
2498   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
2499   rtpsession->recv_rtp_src =
2500       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
2501       "recv_rtp_src");
2502   gst_pad_set_event_function (rtpsession->recv_rtp_src,
2503       gst_rtp_session_event_recv_rtp_src);
2504   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
2505       gst_rtp_session_iterate_internal_links);
2506   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
2507   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
2508   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
2509
2510   return rtpsession->recv_rtp_sink;
2511 }
2512
2513 /* Remove sinkpad to receive RTP packets from senders. This will also remove
2514  * the srcpad for the RTP packets.
2515  */
2516 static void
2517 remove_recv_rtp_sink (GstRtpSession * rtpsession)
2518 {
2519   GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
2520
2521   /* deactivate from source to sink */
2522   gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
2523   gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
2524
2525   /* remove pads */
2526   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2527       rtpsession->recv_rtp_sink);
2528   rtpsession->recv_rtp_sink = NULL;
2529
2530   GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
2531   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2532       rtpsession->recv_rtp_src);
2533   rtpsession->recv_rtp_src = NULL;
2534 }
2535
2536 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
2537  * sync_src pad for the SR packets.
2538  */
2539 static GstPad *
2540 create_recv_rtcp_sink (GstRtpSession * rtpsession)
2541 {
2542   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
2543
2544   rtpsession->recv_rtcp_sink =
2545       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
2546       "recv_rtcp_sink");
2547   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
2548       gst_rtp_session_chain_recv_rtcp);
2549   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
2550       gst_rtp_session_event_recv_rtcp_sink);
2551   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
2552       gst_rtp_session_iterate_internal_links);
2553   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
2554   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2555       rtpsession->recv_rtcp_sink);
2556
2557   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
2558   rtpsession->sync_src =
2559       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
2560       "sync_src");
2561   gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
2562       gst_rtp_session_iterate_internal_links);
2563   gst_pad_use_fixed_caps (rtpsession->sync_src);
2564   gst_pad_set_active (rtpsession->sync_src, TRUE);
2565   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2566
2567   return rtpsession->recv_rtcp_sink;
2568 }
2569
2570 static void
2571 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
2572 {
2573   GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
2574
2575   gst_pad_set_active (rtpsession->sync_src, FALSE);
2576   gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
2577
2578   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2579       rtpsession->recv_rtcp_sink);
2580   rtpsession->recv_rtcp_sink = NULL;
2581
2582   GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
2583   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2584   rtpsession->sync_src = NULL;
2585 }
2586
2587 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
2588  * send_rtp_src pad.
2589  */
2590 static GstPad *
2591 create_send_rtp_sink (GstRtpSession * rtpsession)
2592 {
2593   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2594
2595   rtpsession->send_rtp_sink =
2596       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2597       "send_rtp_sink");
2598   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2599       gst_rtp_session_chain_send_rtp);
2600   gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2601       gst_rtp_session_chain_send_rtp_list);
2602   gst_pad_set_query_function (rtpsession->send_rtp_sink,
2603       gst_rtp_session_query_send_rtp);
2604   gst_pad_set_event_function (rtpsession->send_rtp_sink,
2605       gst_rtp_session_event_send_rtp_sink);
2606   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2607       gst_rtp_session_iterate_internal_links);
2608   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
2609   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
2610   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2611   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2612       rtpsession->send_rtp_sink);
2613
2614   rtpsession->send_rtp_src =
2615       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2616       "send_rtp_src");
2617   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2618       gst_rtp_session_iterate_internal_links);
2619   gst_pad_set_event_function (rtpsession->send_rtp_src,
2620       gst_rtp_session_event_send_rtp_src);
2621   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
2622   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2623   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2624
2625   return rtpsession->send_rtp_sink;
2626 }
2627
2628 static void
2629 remove_send_rtp_sink (GstRtpSession * rtpsession)
2630 {
2631   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2632
2633   gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2634   gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2635
2636   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2637       rtpsession->send_rtp_sink);
2638   rtpsession->send_rtp_sink = NULL;
2639
2640   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2641       rtpsession->send_rtp_src);
2642   rtpsession->send_rtp_src = NULL;
2643 }
2644
2645 /* Create a srcpad with the RTCP packets to send out.
2646  * This pad will be driven by the RTP session manager when it wants to send out
2647  * RTCP packets.
2648  */
2649 static GstPad *
2650 create_send_rtcp_src (GstRtpSession * rtpsession)
2651 {
2652   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2653
2654   rtpsession->send_rtcp_src =
2655       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2656       "send_rtcp_src");
2657   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2658   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2659   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2660       gst_rtp_session_iterate_internal_links);
2661   gst_pad_set_query_function (rtpsession->send_rtcp_src,
2662       gst_rtp_session_query_send_rtcp_src);
2663   gst_pad_set_event_function (rtpsession->send_rtcp_src,
2664       gst_rtp_session_event_send_rtcp_src);
2665   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2666       rtpsession->send_rtcp_src);
2667
2668   return rtpsession->send_rtcp_src;
2669 }
2670
2671 static void
2672 remove_send_rtcp_src (GstRtpSession * rtpsession)
2673 {
2674   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2675
2676   gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2677
2678   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2679       rtpsession->send_rtcp_src);
2680   rtpsession->send_rtcp_src = NULL;
2681 }
2682
2683 static GstPad *
2684 gst_rtp_session_request_new_pad (GstElement * element,
2685     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2686 {
2687   GstRtpSession *rtpsession;
2688   GstElementClass *klass;
2689   GstPad *result;
2690
2691   g_return_val_if_fail (templ != NULL, NULL);
2692   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2693
2694   rtpsession = GST_RTP_SESSION (element);
2695   klass = GST_ELEMENT_GET_CLASS (element);
2696
2697   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2698
2699   GST_RTP_SESSION_LOCK (rtpsession);
2700
2701   /* figure out the template */
2702   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2703     if (rtpsession->recv_rtp_sink != NULL)
2704       goto exists;
2705
2706     result = create_recv_rtp_sink (rtpsession);
2707   } else if (templ == gst_element_class_get_pad_template (klass,
2708           "recv_rtcp_sink")) {
2709     if (rtpsession->recv_rtcp_sink != NULL)
2710       goto exists;
2711
2712     result = create_recv_rtcp_sink (rtpsession);
2713   } else if (templ == gst_element_class_get_pad_template (klass,
2714           "send_rtp_sink")) {
2715     if (rtpsession->send_rtp_sink != NULL)
2716       goto exists;
2717
2718     result = create_send_rtp_sink (rtpsession);
2719   } else if (templ == gst_element_class_get_pad_template (klass,
2720           "send_rtcp_src")) {
2721     if (rtpsession->send_rtcp_src != NULL)
2722       goto exists;
2723
2724     result = create_send_rtcp_src (rtpsession);
2725   } else
2726     goto wrong_template;
2727
2728   GST_RTP_SESSION_UNLOCK (rtpsession);
2729
2730   return result;
2731
2732   /* ERRORS */
2733 wrong_template:
2734   {
2735     GST_RTP_SESSION_UNLOCK (rtpsession);
2736     g_warning ("rtpsession: this is not our template");
2737     return NULL;
2738   }
2739 exists:
2740   {
2741     GST_RTP_SESSION_UNLOCK (rtpsession);
2742     g_warning ("rtpsession: pad already requested");
2743     return NULL;
2744   }
2745 }
2746
2747 static void
2748 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2749 {
2750   GstRtpSession *rtpsession;
2751
2752   g_return_if_fail (GST_IS_RTP_SESSION (element));
2753   g_return_if_fail (GST_IS_PAD (pad));
2754
2755   rtpsession = GST_RTP_SESSION (element);
2756
2757   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2758
2759   GST_RTP_SESSION_LOCK (rtpsession);
2760
2761   if (rtpsession->recv_rtp_sink == pad) {
2762     remove_recv_rtp_sink (rtpsession);
2763   } else if (rtpsession->recv_rtcp_sink == pad) {
2764     remove_recv_rtcp_sink (rtpsession);
2765   } else if (rtpsession->send_rtp_sink == pad) {
2766     remove_send_rtp_sink (rtpsession);
2767   } else if (rtpsession->send_rtcp_src == pad) {
2768     remove_send_rtcp_src (rtpsession);
2769   } else
2770     goto wrong_pad;
2771
2772   GST_RTP_SESSION_UNLOCK (rtpsession);
2773
2774   return;
2775
2776   /* ERRORS */
2777 wrong_pad:
2778   {
2779     GST_RTP_SESSION_UNLOCK (rtpsession);
2780     g_warning ("rtpsession: asked to release an unknown pad");
2781     return;
2782   }
2783 }
2784
2785 static void
2786 gst_rtp_session_request_key_unit (RTPSession * sess,
2787     guint32 ssrc, gboolean all_headers, gpointer user_data)
2788 {
2789   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2790   GstEvent *event;
2791   GstPad *send_rtp_sink;
2792
2793   GST_RTP_SESSION_LOCK (rtpsession);
2794   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2795     gst_object_ref (send_rtp_sink);
2796   GST_RTP_SESSION_UNLOCK (rtpsession);
2797
2798   if (send_rtp_sink) {
2799     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2800         gst_structure_new ("GstForceKeyUnit", "ssrc", G_TYPE_UINT, ssrc,
2801             "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2802     gst_pad_push_event (send_rtp_sink, event);
2803     gst_object_unref (send_rtp_sink);
2804   }
2805 }
2806
2807 static GstClockTime
2808 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2809 {
2810   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2811
2812   return gst_clock_get_time (rtpsession->priv->sysclock);
2813 }
2814
2815 static void
2816 gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
2817     guint16 blp, guint32 ssrc, gpointer user_data)
2818 {
2819   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2820   GstEvent *event;
2821   GstPad *send_rtp_sink;
2822
2823   GST_RTP_SESSION_LOCK (rtpsession);
2824   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2825     gst_object_ref (send_rtp_sink);
2826   GST_RTP_SESSION_UNLOCK (rtpsession);
2827
2828   if (send_rtp_sink) {
2829     while (TRUE) {
2830       event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2831           gst_structure_new ("GstRTPRetransmissionRequest",
2832               "seqnum", G_TYPE_UINT, (guint) seqnum,
2833               "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
2834       gst_pad_push_event (send_rtp_sink, event);
2835
2836       GST_RTP_SESSION_LOCK (rtpsession);
2837       rtpsession->priv->sent_rtx_req_count++;
2838       GST_RTP_SESSION_UNLOCK (rtpsession);
2839
2840       if (blp == 0)
2841         break;
2842
2843       seqnum++;
2844       while ((blp & 1) == 0) {
2845         seqnum++;
2846         blp >>= 1;
2847       }
2848       blp >>= 1;
2849     }
2850     gst_object_unref (send_rtp_sink);
2851   }
2852 }
2853
2854 static void
2855 gst_rtp_session_notify_twcc (RTPSession * sess,
2856     GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data)
2857 {
2858   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2859   GstEvent *event;
2860   GstPad *send_rtp_sink;
2861
2862   GST_RTP_SESSION_LOCK (rtpsession);
2863   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2864     gst_object_ref (send_rtp_sink);
2865   if (rtpsession->priv->last_twcc_stats)
2866     gst_structure_free (rtpsession->priv->last_twcc_stats);
2867   rtpsession->priv->last_twcc_stats = twcc_stats;
2868   GST_RTP_SESSION_UNLOCK (rtpsession);
2869
2870   if (send_rtp_sink) {
2871     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, twcc_packets);
2872     gst_pad_push_event (send_rtp_sink, event);
2873     gst_object_unref (send_rtp_sink);
2874   }
2875
2876   g_object_notify (G_OBJECT (rtpsession), "twcc-stats");
2877 }
2878
2879 static void
2880 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
2881 {
2882   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2883   GstPad *send_rtp_sink;
2884
2885   GST_RTP_SESSION_LOCK (rtpsession);
2886   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2887     gst_object_ref (send_rtp_sink);
2888   GST_RTP_SESSION_UNLOCK (rtpsession);
2889
2890   if (send_rtp_sink) {
2891     gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
2892     gst_object_unref (send_rtp_sink);
2893   }
2894 }
2895
2896 static void
2897 gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
2898 {
2899   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2900
2901   GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
2902   /* with an early RTCP request, we might have to start the RTCP thread */
2903   GST_RTP_SESSION_LOCK (rtpsession);
2904   signal_waiting_rtcp_thread_unlocked (rtpsession);
2905   GST_RTP_SESSION_UNLOCK (rtpsession);
2906 }