rtpsession: Don't start the RTCP thread until it's needed
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpsession
22  * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
23  *
24  * The RTP session manager models participants with unique SSRC in an RTP
25  * session. This session can be used to send and receive RTP and RTCP packets.
26  * Based on what REQUEST pads are requested from the session manager, specific
27  * functionality can be activated.
28  *
29  * The session manager currently implements RFC 3550 including:
30  * <itemizedlist>
31  *   <listitem>
32  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
33  *   </listitem>
34  *   <listitem>
35  *     <para>Maintainance of the SSRC participant database.</para>
36  *   </listitem>
37  *   <listitem>
38  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
39  *   </listitem>
40  *   <listitem>
41  *     <para>Scheduling of RR/SR RTCP packets.</para>
42  *   </listitem>
43  *   <listitem>
44  *     <para>Support for multiple sender SSRC.</para>
45  *   </listitem>
46  * </itemizedlist>
47  *
48  * The rtpsession will not demux packets based on SSRC or payload type, nor will
49  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
50  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
51  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
52  * combines all these features in one element.
53  *
54  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
55  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
56  * will be processed in the session and after being validated forwarded on the
57  * recv_rtp_src pad.
58  *
59  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
60  * which will automatically create a sync_src pad. Packets received on the RTCP
61  * pad will be used by the session manager to update the stats and database of
62  * the other participants. SR packets will be forwarded on the sync_src pad
63  * so that they can be used to perform inter-stream synchronisation when needed.
64  *
65  * If you want the session manager to generate and send RTCP packets, request
66  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
67  * that should be sent to all participants in the session.
68  *
69  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
70  * automatically create a send_rtp_src pad. The session manager will
71  * forward the packets on the send_rtp_src pad after updating its internal state.
72  *
73  * The session manager needs the clock-rate of the payload types it is handling
74  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
75  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
76  * signal.
77  *
78  * <refsect2>
79  * <title>Example pipelines</title>
80  * |[
81  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
82  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
83  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
84  * configured based on some negotiation process such as RTSP for this pipeline
85  * to work correctly.
86  * |[
87  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
88  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
89  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
90  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
91  * decoder and display. Receive RTCP packets from port 5001 and process them in
92  * the session manager.
93  * Note that the application/x-rtp caps on udpsrc should be
94  * configured based on some negotiation process such as RTSP for this pipeline
95  * to work correctly.
96  * |[
97  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
98  * ]| Send theora RTP packets through the session manager and out on UDP port
99  * 5000.
100  * |[
101  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
102  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
103  * ]| Send theora RTP packets through the session manager and out on UDP port
104  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
105  * correctly because the second udpsink will not preroll correctly (no RTCP
106  * packets are sent in the PAUSED state). Applications should manually set and
107  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
108  * </refsect2>
109  */
110
111 #ifdef HAVE_CONFIG_H
112 #include "config.h"
113 #endif
114
115 #include <gst/rtp/gstrtpbuffer.h>
116
117 #include <gst/glib-compat-private.h>
118
119 #include "gstrtpsession.h"
120 #include "rtpsession.h"
121
122 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
123 #define GST_CAT_DEFAULT gst_rtp_session_debug
124
125 GType
126 gst_rtp_ntp_time_source_get_type (void)
127 {
128   static GType type = 0;
129   static const GEnumValue values[] = {
130     {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
131     {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
132     {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
133           "Running time based on pipeline clock",
134         "running-time"},
135     {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
136     {0, NULL, NULL},
137   };
138
139   if (!type) {
140     type = g_enum_register_static ("GstRtpNtpTimeSource", values);
141   }
142   return type;
143 }
144
145 /* sink pads */
146 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
147 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
148     GST_PAD_SINK,
149     GST_PAD_REQUEST,
150     GST_STATIC_CAPS ("application/x-rtp")
151     );
152
153 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
154 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
155     GST_PAD_SINK,
156     GST_PAD_REQUEST,
157     GST_STATIC_CAPS ("application/x-rtcp")
158     );
159
160 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS ("application/x-rtp")
165     );
166
167 /* src pads */
168 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
169 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
170     GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     GST_STATIC_CAPS ("application/x-rtp")
173     );
174
175 static GstStaticPadTemplate rtpsession_sync_src_template =
176 GST_STATIC_PAD_TEMPLATE ("sync_src",
177     GST_PAD_SRC,
178     GST_PAD_SOMETIMES,
179     GST_STATIC_CAPS ("application/x-rtcp")
180     );
181
182 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
183 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
184     GST_PAD_SRC,
185     GST_PAD_SOMETIMES,
186     GST_STATIC_CAPS ("application/x-rtp")
187     );
188
189 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
190 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
191     GST_PAD_SRC,
192     GST_PAD_REQUEST,
193     GST_STATIC_CAPS ("application/x-rtcp")
194     );
195
196 /* signals and args */
197 enum
198 {
199   SIGNAL_REQUEST_PT_MAP,
200   SIGNAL_CLEAR_PT_MAP,
201
202   SIGNAL_ON_NEW_SSRC,
203   SIGNAL_ON_SSRC_COLLISION,
204   SIGNAL_ON_SSRC_VALIDATED,
205   SIGNAL_ON_SSRC_ACTIVE,
206   SIGNAL_ON_SSRC_SDES,
207   SIGNAL_ON_BYE_SSRC,
208   SIGNAL_ON_BYE_TIMEOUT,
209   SIGNAL_ON_TIMEOUT,
210   SIGNAL_ON_SENDER_TIMEOUT,
211   SIGNAL_ON_NEW_SENDER_SSRC,
212   SIGNAL_ON_SENDER_SSRC_ACTIVE,
213   LAST_SIGNAL
214 };
215
216 #define DEFAULT_BANDWIDTH            0
217 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
218 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
219 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
220 #define DEFAULT_SDES                 NULL
221 #define DEFAULT_NUM_SOURCES          0
222 #define DEFAULT_NUM_ACTIVE_SOURCES   0
223 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
224 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
225 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
226 #define DEFAULT_MAX_DROPOUT_TIME     60000
227 #define DEFAULT_MAX_MISORDER_TIME    2000
228 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
229 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
230 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
231
232 enum
233 {
234   PROP_0,
235   PROP_BANDWIDTH,
236   PROP_RTCP_FRACTION,
237   PROP_RTCP_RR_BANDWIDTH,
238   PROP_RTCP_RS_BANDWIDTH,
239   PROP_SDES,
240   PROP_NUM_SOURCES,
241   PROP_NUM_ACTIVE_SOURCES,
242   PROP_INTERNAL_SESSION,
243   PROP_USE_PIPELINE_CLOCK,
244   PROP_RTCP_MIN_INTERVAL,
245   PROP_PROBATION,
246   PROP_MAX_DROPOUT_TIME,
247   PROP_MAX_MISORDER_TIME,
248   PROP_STATS,
249   PROP_RTP_PROFILE,
250   PROP_NTP_TIME_SOURCE,
251   PROP_RTCP_SYNC_SEND_TIME
252 };
253
254 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->priv->lock)
255 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
256
257 #define GST_RTP_SESSION_WAIT(sess)   g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock)
258 #define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond)
259
260 struct _GstRtpSessionPrivate
261 {
262   GMutex lock;
263   GCond cond;
264   GstClock *sysclock;
265
266   RTPSession *session;
267
268   /* thread for sending out RTCP */
269   GstClockID id;
270   gboolean stop_thread;
271   GThread *thread;
272   gboolean thread_stopped;
273   gboolean wait_send;
274
275   /* caps mapping */
276   GHashTable *ptmap;
277
278   GstClockTime send_latency;
279
280   gboolean use_pipeline_clock;
281   GstRtpNtpTimeSource ntp_time_source;
282   gboolean rtcp_sync_send_time;
283
284   guint rtx_count;
285 };
286
287 /* callbacks to handle actions from the session manager */
288 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
289     RTPSource * src, GstBuffer * buffer, gpointer user_data);
290 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
291     RTPSource * src, gpointer data, gpointer user_data);
292 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
293     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
294 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
295     GstBuffer * buffer, gpointer user_data);
296 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
297     gpointer user_data);
298 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
299 static void gst_rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
300     gboolean all_headers, gpointer user_data);
301 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
302     gpointer user_data);
303 static void gst_rtp_session_notify_nack (RTPSession * sess,
304     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
305 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
306 static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
307     gpointer user_data);
308
309 static RTPSessionCallbacks callbacks = {
310   gst_rtp_session_process_rtp,
311   gst_rtp_session_send_rtp,
312   gst_rtp_session_sync_rtcp,
313   gst_rtp_session_send_rtcp,
314   gst_rtp_session_clock_rate,
315   gst_rtp_session_reconsider,
316   gst_rtp_session_request_key_unit,
317   gst_rtp_session_request_time,
318   gst_rtp_session_notify_nack,
319   gst_rtp_session_reconfigure,
320   gst_rtp_session_notify_early_rtcp
321 };
322
323 /* GObject vmethods */
324 static void gst_rtp_session_finalize (GObject * object);
325 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
326     const GValue * value, GParamSpec * pspec);
327 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
328     GValue * value, GParamSpec * pspec);
329
330 /* GstElement vmethods */
331 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
332     GstStateChange transition);
333 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
334     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
335 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
336
337 static gboolean gst_rtp_session_sink_setcaps (GstPad * pad,
338     GstRtpSession * rtpsession, GstCaps * caps);
339 static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
340     GstRtpSession * rtpsession, GstCaps * caps);
341
342 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
343
344 static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
345
346 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
347
348 static void
349 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
350 {
351   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
352       src->ssrc);
353 }
354
355 static void
356 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
357 {
358   GstPad *send_rtp_sink;
359
360   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
361       src->ssrc);
362
363   GST_RTP_SESSION_LOCK (sess);
364   if ((send_rtp_sink = sess->send_rtp_sink))
365     gst_object_ref (send_rtp_sink);
366   GST_RTP_SESSION_UNLOCK (sess);
367
368   if (send_rtp_sink) {
369     GstStructure *structure;
370     GstEvent *event;
371     RTPSource *internal_src;
372     guint32 suggested_ssrc;
373
374     structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
375         (guint) src->ssrc, NULL);
376
377     /* if there is no source using the suggested ssrc, most probably because
378      * this ssrc has just collided, suggest upstream to use it */
379     suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
380     internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
381     if (!internal_src)
382       gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
383           (guint) suggested_ssrc, NULL);
384     else
385       g_object_unref (internal_src);
386
387     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
388     gst_pad_push_event (send_rtp_sink, event);
389     gst_object_unref (send_rtp_sink);
390   }
391 }
392
393 static void
394 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
395 {
396   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
397       src->ssrc);
398 }
399
400 static void
401 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
402 {
403   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
404       src->ssrc);
405 }
406
407 static void
408 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
409 {
410   GstStructure *s;
411   GstMessage *m;
412
413   /* convert the new SDES info into a message */
414   RTP_SESSION_LOCK (session);
415   g_object_get (src, "sdes", &s, NULL);
416   RTP_SESSION_UNLOCK (session);
417
418   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
419   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
420
421   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
422       src->ssrc);
423 }
424
425 static void
426 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
427 {
428   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
429       src->ssrc);
430 }
431
432 static void
433 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
434 {
435   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
436       src->ssrc);
437 }
438
439 static void
440 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
441 {
442   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
443       src->ssrc);
444 }
445
446 static void
447 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
448 {
449   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
450       src->ssrc);
451 }
452
453 static void
454 on_new_sender_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
455 {
456   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
457       src->ssrc);
458 }
459
460 static void
461 on_sender_ssrc_active (RTPSession * session, RTPSource * src,
462     GstRtpSession * sess)
463 {
464   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
465       src->ssrc);
466 }
467
468 static void
469 on_notify_stats (RTPSession * session, GParamSpec * spec,
470     GstRtpSession * rtpsession)
471 {
472   g_object_notify (G_OBJECT (rtpsession), "stats");
473 }
474
475 #define gst_rtp_session_parent_class parent_class
476 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
477
478 static void
479 gst_rtp_session_class_init (GstRtpSessionClass * klass)
480 {
481   GObjectClass *gobject_class;
482   GstElementClass *gstelement_class;
483
484   gobject_class = (GObjectClass *) klass;
485   gstelement_class = (GstElementClass *) klass;
486
487   gobject_class->finalize = gst_rtp_session_finalize;
488   gobject_class->set_property = gst_rtp_session_set_property;
489   gobject_class->get_property = gst_rtp_session_get_property;
490
491   /**
492    * GstRtpSession::request-pt-map:
493    * @sess: the object which received the signal
494    * @pt: the pt
495    *
496    * Request the payload type as #GstCaps for @pt.
497    */
498   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
499       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
500       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
501       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
502   /**
503    * GstRtpSession::clear-pt-map:
504    * @sess: the object which received the signal
505    *
506    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
507    */
508   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
509       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
510       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
511       G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
512       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE);
513
514   /**
515    * GstRtpSession::on-new-ssrc:
516    * @sess: the object which received the signal
517    * @ssrc: the SSRC
518    *
519    * Notify of a new SSRC that entered @session.
520    */
521   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
522       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
523       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
524       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
525   /**
526    * GstRtpSession::on-ssrc_collision:
527    * @sess: the object which received the signal
528    * @ssrc: the SSRC
529    *
530    * Notify when we have an SSRC collision
531    */
532   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
533       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
534       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
535           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
536       G_TYPE_NONE, 1, G_TYPE_UINT);
537   /**
538    * GstRtpSession::on-ssrc_validated:
539    * @sess: the object which received the signal
540    * @ssrc: the SSRC
541    *
542    * Notify of a new SSRC that became validated.
543    */
544   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
545       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
546       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
547           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
548       G_TYPE_NONE, 1, G_TYPE_UINT);
549   /**
550    * GstRtpSession::on-ssrc-active:
551    * @sess: the object which received the signal
552    * @ssrc: the SSRC
553    *
554    * Notify of a SSRC that is active, i.e., sending RTCP.
555    */
556   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
557       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
558       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
559           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
560       G_TYPE_NONE, 1, G_TYPE_UINT);
561   /**
562    * GstRtpSession::on-ssrc-sdes:
563    * @session: the object which received the signal
564    * @src: the SSRC
565    *
566    * Notify that a new SDES was received for SSRC.
567    */
568   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
569       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
570       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
571       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
572
573   /**
574    * GstRtpSession::on-bye-ssrc:
575    * @sess: the object which received the signal
576    * @ssrc: the SSRC
577    *
578    * Notify of an SSRC that became inactive because of a BYE packet.
579    */
580   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
581       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
582       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
583       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
584   /**
585    * GstRtpSession::on-bye-timeout:
586    * @sess: the object which received the signal
587    * @ssrc: the SSRC
588    *
589    * Notify of an SSRC that has timed out because of BYE
590    */
591   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
592       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
593       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
594       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
595   /**
596    * GstRtpSession::on-timeout:
597    * @sess: the object which received the signal
598    * @ssrc: the SSRC
599    *
600    * Notify of an SSRC that has timed out
601    */
602   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
603       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
604       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
605       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
606   /**
607    * GstRtpSession::on-sender-timeout:
608    * @sess: the object which received the signal
609    * @ssrc: the SSRC
610    *
611    * Notify of a sender SSRC that has timed out and became a receiver
612    */
613   gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
614       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
615       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
616           on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
617       G_TYPE_NONE, 1, G_TYPE_UINT);
618
619   /**
620    * GstRtpSession::on-new-sender-ssrc:
621    * @sess: the object which received the signal
622    * @ssrc: the sender SSRC
623    *
624    * Notify of a new sender SSRC that entered @session.
625    *
626    * Since: 1.8
627    */
628   gst_rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
629       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
630       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
631       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
632
633   /**
634    * GstRtpSession::on-sender-ssrc-active:
635    * @sess: the object which received the signal
636    * @ssrc: the sender SSRC
637    *
638    * Notify of a sender SSRC that is active, i.e., sending RTCP.
639    *
640    * Since: 1.8
641    */
642   gst_rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
643       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
644       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
645           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
646       G_TYPE_NONE, 1, G_TYPE_UINT);
647
648   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
649       g_param_spec_double ("bandwidth", "Bandwidth",
650           "The bandwidth of the session in bytes per second (0 for auto-discover)",
651           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
652           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
653
654   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
655       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
656           "The RTCP bandwidth of the session in bytes per second "
657           "(or as a real fraction of the RTP bandwidth if < 1.0)",
658           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
659           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
660
661   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
662       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
663           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
664           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
665           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
666
667   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
668       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
669           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
670           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
671           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
672
673   g_object_class_install_property (gobject_class, PROP_SDES,
674       g_param_spec_boxed ("sdes", "SDES",
675           "The SDES items of this session",
676           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
677
678   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
679       g_param_spec_uint ("num-sources", "Num Sources",
680           "The number of sources in the session", 0, G_MAXUINT,
681           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
682
683   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
684       g_param_spec_uint ("num-active-sources", "Num Active Sources",
685           "The number of active sources in the session", 0, G_MAXUINT,
686           DEFAULT_NUM_ACTIVE_SOURCES,
687           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
688
689   g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
690       g_param_spec_object ("internal-session", "Internal Session",
691           "The internal RTPSession object", RTP_TYPE_SESSION,
692           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
693
694   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
695       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
696           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
697           "(DEPRECATED: Use ntp-time-source property)",
698           DEFAULT_USE_PIPELINE_CLOCK,
699           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
700
701   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
702       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
703           "Minimum interval between Regular RTCP packet (in ns)",
704           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
705           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
706
707   g_object_class_install_property (gobject_class, PROP_PROBATION,
708       g_param_spec_uint ("probation", "Number of probations",
709           "Consecutive packet sequence numbers to accept the source",
710           0, G_MAXUINT, DEFAULT_PROBATION,
711           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
712
713   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
714       g_param_spec_uint ("max-dropout-time", "Max dropout time",
715           "The maximum time (milliseconds) of missing packets tolerated.",
716           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
717           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
718
719   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
720       g_param_spec_uint ("max-misorder-time", "Max misorder time",
721           "The maximum time (milliseconds) of misordered packets tolerated.",
722           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
723           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
724
725   /**
726    * GstRtpSession::stats:
727    *
728    * Various session statistics. This property returns a GstStructure
729    * with name application/x-rtp-session-stats with the following fields:
730    *
731    *  "rtx-count"       G_TYPE_UINT   The number of retransmission events
732    *      received from downstream (in receiver mode)
733    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
734    *      dropped (due to bandwidth constraints)
735    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
736    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
737    *  "source-stats"    G_TYPE_BOXED  GValueArray of #RTPSource::stats for all
738    *      RTP sources (Since 1.8)
739    *
740    * Since: 1.4
741    */
742   g_object_class_install_property (gobject_class, PROP_STATS,
743       g_param_spec_boxed ("stats", "Statistics",
744           "Various statistics", GST_TYPE_STRUCTURE,
745           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
746
747   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
748       g_param_spec_enum ("rtp-profile", "RTP Profile",
749           "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
750           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
751
752   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
753       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
754           "NTP time source for RTCP packets",
755           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
756           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
757
758   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
759       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
760           "Use send time or capture time for RTCP sync "
761           "(TRUE = send time, FALSE = capture time)",
762           DEFAULT_RTCP_SYNC_SEND_TIME,
763           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
764
765   gstelement_class->change_state =
766       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
767   gstelement_class->request_new_pad =
768       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
769   gstelement_class->release_pad =
770       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
771
772   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
773
774   /* sink pads */
775   gst_element_class_add_static_pad_template (gstelement_class,
776       &rtpsession_recv_rtp_sink_template);
777   gst_element_class_add_static_pad_template (gstelement_class,
778       &rtpsession_recv_rtcp_sink_template);
779   gst_element_class_add_static_pad_template (gstelement_class,
780       &rtpsession_send_rtp_sink_template);
781
782   /* src pads */
783   gst_element_class_add_static_pad_template (gstelement_class,
784       &rtpsession_recv_rtp_src_template);
785   gst_element_class_add_static_pad_template (gstelement_class,
786       &rtpsession_sync_src_template);
787   gst_element_class_add_static_pad_template (gstelement_class,
788       &rtpsession_send_rtp_src_template);
789   gst_element_class_add_static_pad_template (gstelement_class,
790       &rtpsession_send_rtcp_src_template);
791
792   gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
793       "Filter/Network/RTP",
794       "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
795
796   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
797       "rtpsession", 0, "RTP Session");
798 }
799
800 static void
801 gst_rtp_session_init (GstRtpSession * rtpsession)
802 {
803   rtpsession->priv = gst_rtp_session_get_instance_private (rtpsession);
804   g_mutex_init (&rtpsession->priv->lock);
805   g_cond_init (&rtpsession->priv->cond);
806   rtpsession->priv->sysclock = gst_system_clock_obtain ();
807   rtpsession->priv->session = rtp_session_new ();
808   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
809   rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
810
811   /* configure callbacks */
812   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
813   /* configure signals */
814   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
815       (GCallback) on_new_ssrc, rtpsession);
816   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
817       (GCallback) on_ssrc_collision, rtpsession);
818   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
819       (GCallback) on_ssrc_validated, rtpsession);
820   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
821       (GCallback) on_ssrc_active, rtpsession);
822   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
823       (GCallback) on_ssrc_sdes, rtpsession);
824   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
825       (GCallback) on_bye_ssrc, rtpsession);
826   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
827       (GCallback) on_bye_timeout, rtpsession);
828   g_signal_connect (rtpsession->priv->session, "on-timeout",
829       (GCallback) on_timeout, rtpsession);
830   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
831       (GCallback) on_sender_timeout, rtpsession);
832   g_signal_connect (rtpsession->priv->session, "on-new-sender-ssrc",
833       (GCallback) on_new_sender_ssrc, rtpsession);
834   g_signal_connect (rtpsession->priv->session, "on-sender-ssrc-active",
835       (GCallback) on_sender_ssrc_active, rtpsession);
836   g_signal_connect (rtpsession->priv->session, "notify::stats",
837       (GCallback) on_notify_stats, rtpsession);
838   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
839       (GDestroyNotify) gst_caps_unref);
840
841   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
842   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
843
844   rtpsession->priv->thread_stopped = TRUE;
845
846   rtpsession->priv->rtx_count = 0;
847
848   rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
849 }
850
851 static void
852 gst_rtp_session_finalize (GObject * object)
853 {
854   GstRtpSession *rtpsession;
855
856   rtpsession = GST_RTP_SESSION (object);
857
858   g_hash_table_destroy (rtpsession->priv->ptmap);
859   g_mutex_clear (&rtpsession->priv->lock);
860   g_cond_clear (&rtpsession->priv->cond);
861   g_object_unref (rtpsession->priv->sysclock);
862   g_object_unref (rtpsession->priv->session);
863
864   G_OBJECT_CLASS (parent_class)->finalize (object);
865 }
866
867 static void
868 gst_rtp_session_set_property (GObject * object, guint prop_id,
869     const GValue * value, GParamSpec * pspec)
870 {
871   GstRtpSession *rtpsession;
872   GstRtpSessionPrivate *priv;
873
874   rtpsession = GST_RTP_SESSION (object);
875   priv = rtpsession->priv;
876
877   switch (prop_id) {
878     case PROP_BANDWIDTH:
879       g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
880       break;
881     case PROP_RTCP_FRACTION:
882       g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
883       break;
884     case PROP_RTCP_RR_BANDWIDTH:
885       g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
886           value);
887       break;
888     case PROP_RTCP_RS_BANDWIDTH:
889       g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
890           value);
891       break;
892     case PROP_SDES:
893       rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
894       break;
895     case PROP_USE_PIPELINE_CLOCK:
896       priv->use_pipeline_clock = g_value_get_boolean (value);
897       break;
898     case PROP_RTCP_MIN_INTERVAL:
899       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
900           value);
901       break;
902     case PROP_PROBATION:
903       g_object_set_property (G_OBJECT (priv->session), "probation", value);
904       break;
905     case PROP_MAX_DROPOUT_TIME:
906       g_object_set_property (G_OBJECT (priv->session), "max-dropout-time",
907           value);
908       break;
909     case PROP_MAX_MISORDER_TIME:
910       g_object_set_property (G_OBJECT (priv->session), "max-misorder-time",
911           value);
912       break;
913     case PROP_RTP_PROFILE:
914       g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
915       break;
916     case PROP_NTP_TIME_SOURCE:
917       priv->ntp_time_source = g_value_get_enum (value);
918       break;
919     case PROP_RTCP_SYNC_SEND_TIME:
920       priv->rtcp_sync_send_time = g_value_get_boolean (value);
921       break;
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927
928 static void
929 gst_rtp_session_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstRtpSession *rtpsession;
933   GstRtpSessionPrivate *priv;
934
935   rtpsession = GST_RTP_SESSION (object);
936   priv = rtpsession->priv;
937
938   switch (prop_id) {
939     case PROP_BANDWIDTH:
940       g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
941       break;
942     case PROP_RTCP_FRACTION:
943       g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
944       break;
945     case PROP_RTCP_RR_BANDWIDTH:
946       g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
947           value);
948       break;
949     case PROP_RTCP_RS_BANDWIDTH:
950       g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
951           value);
952       break;
953     case PROP_SDES:
954       g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
955       break;
956     case PROP_NUM_SOURCES:
957       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
958       break;
959     case PROP_NUM_ACTIVE_SOURCES:
960       g_value_set_uint (value,
961           rtp_session_get_num_active_sources (priv->session));
962       break;
963     case PROP_INTERNAL_SESSION:
964       g_value_set_object (value, priv->session);
965       break;
966     case PROP_USE_PIPELINE_CLOCK:
967       g_value_set_boolean (value, priv->use_pipeline_clock);
968       break;
969     case PROP_RTCP_MIN_INTERVAL:
970       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
971           value);
972       break;
973     case PROP_PROBATION:
974       g_object_get_property (G_OBJECT (priv->session), "probation", value);
975       break;
976     case PROP_MAX_DROPOUT_TIME:
977       g_object_get_property (G_OBJECT (priv->session), "max-dropout-time",
978           value);
979       break;
980     case PROP_MAX_MISORDER_TIME:
981       g_object_get_property (G_OBJECT (priv->session), "max-misorder-time",
982           value);
983       break;
984     case PROP_STATS:
985       g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
986       break;
987     case PROP_RTP_PROFILE:
988       g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
989       break;
990     case PROP_NTP_TIME_SOURCE:
991       g_value_set_enum (value, priv->ntp_time_source);
992       break;
993     case PROP_RTCP_SYNC_SEND_TIME:
994       g_value_set_boolean (value, priv->rtcp_sync_send_time);
995       break;
996     default:
997       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
998       break;
999   }
1000 }
1001
1002 static GstStructure *
1003 gst_rtp_session_create_stats (GstRtpSession * rtpsession)
1004 {
1005   GstStructure *s;
1006
1007   g_object_get (rtpsession->priv->session, "stats", &s, NULL);
1008   gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
1009       NULL);
1010
1011   return s;
1012 }
1013
1014 static void
1015 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
1016     guint64 * ntpnstime)
1017 {
1018   guint64 ntpns = -1;
1019   GstClock *clock;
1020   GstClockTime base_time, rt, clock_time;
1021
1022   GST_OBJECT_LOCK (rtpsession);
1023   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
1024     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
1025     gst_object_ref (clock);
1026     GST_OBJECT_UNLOCK (rtpsession);
1027
1028     /* get current clock time and convert to running time */
1029     clock_time = gst_clock_get_time (clock);
1030     rt = clock_time - base_time;
1031
1032     if (rtpsession->priv->use_pipeline_clock) {
1033       ntpns = rt;
1034       /* add constant to convert from 1970 based time to 1900 based time */
1035       ntpns += (2208988800LL * GST_SECOND);
1036     } else {
1037       switch (rtpsession->priv->ntp_time_source) {
1038         case GST_RTP_NTP_TIME_SOURCE_NTP:
1039         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1040           GTimeVal current;
1041
1042           /* get current NTP time */
1043           g_get_current_time (&current);
1044           ntpns = GST_TIMEVAL_TO_TIME (current);
1045
1046           /* add constant to convert from 1970 based time to 1900 based time */
1047           if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1048             ntpns += (2208988800LL * GST_SECOND);
1049           break;
1050         }
1051         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1052           ntpns = rt;
1053           break;
1054         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1055           ntpns = clock_time;
1056           break;
1057         default:
1058           ntpns = -1;
1059           g_assert_not_reached ();
1060           break;
1061       }
1062     }
1063
1064     gst_object_unref (clock);
1065   } else {
1066     GST_OBJECT_UNLOCK (rtpsession);
1067     rt = -1;
1068     ntpns = -1;
1069   }
1070   if (running_time)
1071     *running_time = rt;
1072   if (ntpnstime)
1073     *ntpnstime = ntpns;
1074 }
1075
1076 /* must be called with GST_RTP_SESSION_LOCK */
1077 static void
1078 signal_waiting_rtcp_thread_unlocked (GstRtpSession * rtpsession)
1079 {
1080   if (rtpsession->priv->wait_send) {
1081     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1082     rtpsession->priv->wait_send = FALSE;
1083     GST_RTP_SESSION_SIGNAL (rtpsession);
1084   }
1085 }
1086
1087 static void
1088 rtcp_thread (GstRtpSession * rtpsession)
1089 {
1090   GstClockID id;
1091   GstClockTime current_time;
1092   GstClockTime next_timeout;
1093   guint64 ntpnstime;
1094   GstClockTime running_time;
1095   RTPSession *session;
1096   GstClock *sysclock;
1097
1098   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
1099
1100   GST_RTP_SESSION_LOCK (rtpsession);
1101
1102   while (rtpsession->priv->wait_send) {
1103     GST_LOG_OBJECT (rtpsession, "waiting for getting started");
1104     GST_RTP_SESSION_WAIT (rtpsession);
1105     GST_LOG_OBJECT (rtpsession, "signaled...");
1106   }
1107
1108   sysclock = rtpsession->priv->sysclock;
1109   current_time = gst_clock_get_time (sysclock);
1110
1111   session = rtpsession->priv->session;
1112
1113   GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
1114       GST_TIME_ARGS (current_time));
1115   session->start_time = current_time;
1116
1117   while (!rtpsession->priv->stop_thread) {
1118     GstClockReturn res;
1119
1120     /* get initial estimate */
1121     next_timeout = rtp_session_next_timeout (session, current_time);
1122
1123     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
1124         GST_TIME_ARGS (next_timeout));
1125
1126     /* leave if no more timeouts, the session ended */
1127     if (next_timeout == GST_CLOCK_TIME_NONE)
1128       break;
1129
1130     id = rtpsession->priv->id =
1131         gst_clock_new_single_shot_id (sysclock, next_timeout);
1132     GST_RTP_SESSION_UNLOCK (rtpsession);
1133
1134     res = gst_clock_id_wait (id, NULL);
1135
1136     GST_RTP_SESSION_LOCK (rtpsession);
1137     gst_clock_id_unref (id);
1138     rtpsession->priv->id = NULL;
1139
1140     if (rtpsession->priv->stop_thread)
1141       break;
1142
1143     /* update current time */
1144     current_time = gst_clock_get_time (sysclock);
1145
1146     /* get current NTP time */
1147     get_current_times (rtpsession, &running_time, &ntpnstime);
1148
1149     /* we get unlocked because we need to perform reconsideration, don't perform
1150      * the timeout but get a new reporting estimate. */
1151     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
1152         res, GST_TIME_ARGS (current_time));
1153
1154     /* perform actions, we ignore result. Release lock because it might push. */
1155     GST_RTP_SESSION_UNLOCK (rtpsession);
1156     rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
1157     GST_RTP_SESSION_LOCK (rtpsession);
1158   }
1159   /* mark the thread as stopped now */
1160   rtpsession->priv->thread_stopped = TRUE;
1161   GST_RTP_SESSION_UNLOCK (rtpsession);
1162
1163   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
1164 }
1165
1166 static gboolean
1167 start_rtcp_thread (GstRtpSession * rtpsession)
1168 {
1169   GError *error = NULL;
1170   gboolean res;
1171
1172   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
1173
1174   GST_RTP_SESSION_LOCK (rtpsession);
1175   rtpsession->priv->stop_thread = FALSE;
1176   if (rtpsession->priv->thread_stopped) {
1177     /* if the thread stopped, and we still have a handle to the thread, join it
1178      * now. We can safely join with the lock held, the thread will not take it
1179      * anymore. */
1180     if (rtpsession->priv->thread)
1181       g_thread_join (rtpsession->priv->thread);
1182     /* only create a new thread if the old one was stopped. Otherwise we can
1183      * just reuse the currently running one. */
1184     rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
1185         (GThreadFunc) rtcp_thread, rtpsession, &error);
1186     rtpsession->priv->thread_stopped = FALSE;
1187   }
1188   GST_RTP_SESSION_UNLOCK (rtpsession);
1189
1190   if (error != NULL) {
1191     res = FALSE;
1192     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
1193     g_error_free (error);
1194   } else {
1195     res = TRUE;
1196   }
1197   return res;
1198 }
1199
1200 static void
1201 stop_rtcp_thread (GstRtpSession * rtpsession)
1202 {
1203   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
1204
1205   GST_RTP_SESSION_LOCK (rtpsession);
1206   rtpsession->priv->stop_thread = TRUE;
1207   signal_waiting_rtcp_thread_unlocked (rtpsession);
1208   if (rtpsession->priv->id)
1209     gst_clock_id_unschedule (rtpsession->priv->id);
1210   GST_RTP_SESSION_UNLOCK (rtpsession);
1211 }
1212
1213 static void
1214 join_rtcp_thread (GstRtpSession * rtpsession)
1215 {
1216   GST_RTP_SESSION_LOCK (rtpsession);
1217   /* don't try to join when we have no thread */
1218   if (rtpsession->priv->thread != NULL) {
1219     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
1220     GST_RTP_SESSION_UNLOCK (rtpsession);
1221
1222     g_thread_join (rtpsession->priv->thread);
1223
1224     GST_RTP_SESSION_LOCK (rtpsession);
1225     /* after the join, take the lock and clear the thread structure. The caller
1226      * is supposed to not concurrently call start and join. */
1227     rtpsession->priv->thread = NULL;
1228   }
1229   GST_RTP_SESSION_UNLOCK (rtpsession);
1230 }
1231
1232 static GstStateChangeReturn
1233 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
1234 {
1235   GstStateChangeReturn res;
1236   GstRtpSession *rtpsession;
1237
1238   rtpsession = GST_RTP_SESSION (element);
1239
1240   switch (transition) {
1241     case GST_STATE_CHANGE_NULL_TO_READY:
1242       break;
1243     case GST_STATE_CHANGE_READY_TO_PAUSED:
1244       GST_RTP_SESSION_LOCK (rtpsession);
1245       rtpsession->priv->wait_send = TRUE;
1246       GST_RTP_SESSION_UNLOCK (rtpsession);
1247       break;
1248     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1249       break;
1250     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1251     case GST_STATE_CHANGE_PAUSED_TO_READY:
1252       /* no need to join yet, we might want to continue later. Also, the
1253        * dataflow could block downstream so that a join could just block
1254        * forever. */
1255       stop_rtcp_thread (rtpsession);
1256       break;
1257     default:
1258       break;
1259   }
1260
1261   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1262
1263   switch (transition) {
1264     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1265       if (!start_rtcp_thread (rtpsession))
1266         goto failed_thread;
1267       break;
1268     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1269       break;
1270     case GST_STATE_CHANGE_PAUSED_TO_READY:
1271       /* downstream is now releasing the dataflow and we can join. */
1272       join_rtcp_thread (rtpsession);
1273       break;
1274     case GST_STATE_CHANGE_READY_TO_NULL:
1275       break;
1276     default:
1277       break;
1278   }
1279   return res;
1280
1281   /* ERRORS */
1282 failed_thread:
1283   {
1284     return GST_STATE_CHANGE_FAILURE;
1285   }
1286 }
1287
1288 static gboolean
1289 return_true (gpointer key, gpointer value, gpointer user_data)
1290 {
1291   return TRUE;
1292 }
1293
1294 static void
1295 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1296 {
1297   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1298 }
1299
1300 /* called when the session manager has an RTP packet or a list of packets
1301  * ready for further processing */
1302 static GstFlowReturn
1303 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1304     GstBuffer * buffer, gpointer user_data)
1305 {
1306   GstFlowReturn result;
1307   GstRtpSession *rtpsession;
1308   GstPad *rtp_src;
1309
1310   rtpsession = GST_RTP_SESSION (user_data);
1311
1312   GST_RTP_SESSION_LOCK (rtpsession);
1313   if ((rtp_src = rtpsession->recv_rtp_src))
1314     gst_object_ref (rtp_src);
1315   GST_RTP_SESSION_UNLOCK (rtpsession);
1316
1317   if (rtp_src) {
1318     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1319     result = gst_pad_push (rtp_src, buffer);
1320     gst_object_unref (rtp_src);
1321   } else {
1322     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1323     gst_buffer_unref (buffer);
1324     result = GST_FLOW_OK;
1325   }
1326   return result;
1327 }
1328
1329 /* called when the session manager has an RTP packet ready for further
1330  * sending */
1331 static GstFlowReturn
1332 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1333     gpointer data, gpointer user_data)
1334 {
1335   GstFlowReturn result;
1336   GstRtpSession *rtpsession;
1337   GstPad *rtp_src;
1338
1339   rtpsession = GST_RTP_SESSION (user_data);
1340
1341   GST_RTP_SESSION_LOCK (rtpsession);
1342   if ((rtp_src = rtpsession->send_rtp_src))
1343     gst_object_ref (rtp_src);
1344   signal_waiting_rtcp_thread_unlocked (rtpsession);
1345   GST_RTP_SESSION_UNLOCK (rtpsession);
1346
1347   if (rtp_src) {
1348     if (GST_IS_BUFFER (data)) {
1349       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1350       result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1351     } else {
1352       GST_LOG_OBJECT (rtpsession, "sending RTP list");
1353       result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1354     }
1355     gst_object_unref (rtp_src);
1356   } else {
1357     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1358     result = GST_FLOW_OK;
1359   }
1360   return result;
1361 }
1362
1363 static void
1364 do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
1365 {
1366   GstCaps *caps;
1367   GstSegment seg;
1368   GstEvent *event;
1369   gchar *stream_id;
1370   gboolean have_group_id;
1371   guint group_id;
1372
1373   stream_id =
1374       g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
1375       g_random_int (), g_random_int ());
1376
1377   GST_RTP_SESSION_LOCK (rtpsession);
1378   if (rtpsession->recv_rtp_sink) {
1379     event =
1380         gst_pad_get_sticky_event (rtpsession->recv_rtp_sink,
1381         GST_EVENT_STREAM_START, 0);
1382     if (event) {
1383       if (gst_event_parse_group_id (event, &group_id))
1384         have_group_id = TRUE;
1385       else
1386         have_group_id = FALSE;
1387       gst_event_unref (event);
1388     } else {
1389       have_group_id = TRUE;
1390       group_id = gst_util_group_id_next ();
1391     }
1392   } else {
1393     have_group_id = TRUE;
1394     group_id = gst_util_group_id_next ();
1395   }
1396   GST_RTP_SESSION_UNLOCK (rtpsession);
1397
1398   event = gst_event_new_stream_start (stream_id);
1399   if (have_group_id)
1400     gst_event_set_group_id (event, group_id);
1401   gst_pad_push_event (srcpad, event);
1402   g_free (stream_id);
1403
1404   caps = gst_caps_new_empty_simple ("application/x-rtcp");
1405   gst_pad_set_caps (srcpad, caps);
1406   gst_caps_unref (caps);
1407
1408   gst_segment_init (&seg, GST_FORMAT_TIME);
1409   event = gst_event_new_segment (&seg);
1410   gst_pad_push_event (srcpad, event);
1411 }
1412
1413 /* called when the session manager has an RTCP packet ready for further
1414  * sending. The eos flag is set when an EOS event should be sent downstream as
1415  * well. */
1416 static GstFlowReturn
1417 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1418     GstBuffer * buffer, gboolean eos, gpointer user_data)
1419 {
1420   GstFlowReturn result;
1421   GstRtpSession *rtpsession;
1422   GstPad *rtcp_src;
1423
1424   rtpsession = GST_RTP_SESSION (user_data);
1425
1426   GST_RTP_SESSION_LOCK (rtpsession);
1427   if (rtpsession->priv->stop_thread)
1428     goto stopping;
1429
1430   if ((rtcp_src = rtpsession->send_rtcp_src)) {
1431     gst_object_ref (rtcp_src);
1432     GST_RTP_SESSION_UNLOCK (rtpsession);
1433
1434     /* set rtcp caps on output pad */
1435     if (!gst_pad_has_current_caps (rtcp_src))
1436       do_rtcp_events (rtpsession, rtcp_src);
1437
1438     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1439     result = gst_pad_push (rtcp_src, buffer);
1440
1441     /* we have to send EOS after this packet */
1442     if (eos) {
1443       GST_LOG_OBJECT (rtpsession, "sending EOS");
1444       gst_pad_push_event (rtcp_src, gst_event_new_eos ());
1445     }
1446     gst_object_unref (rtcp_src);
1447   } else {
1448     GST_RTP_SESSION_UNLOCK (rtpsession);
1449
1450     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1451     gst_buffer_unref (buffer);
1452     result = GST_FLOW_OK;
1453   }
1454   return result;
1455
1456   /* ERRORS */
1457 stopping:
1458   {
1459     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1460     gst_buffer_unref (buffer);
1461     GST_RTP_SESSION_UNLOCK (rtpsession);
1462     return GST_FLOW_OK;
1463   }
1464 }
1465
1466 /* called when the session manager has an SR RTCP packet ready for handling
1467  * inter stream synchronisation */
1468 static GstFlowReturn
1469 gst_rtp_session_sync_rtcp (RTPSession * sess,
1470     GstBuffer * buffer, gpointer user_data)
1471 {
1472   GstFlowReturn result;
1473   GstRtpSession *rtpsession;
1474   GstPad *sync_src;
1475
1476   rtpsession = GST_RTP_SESSION (user_data);
1477
1478   GST_RTP_SESSION_LOCK (rtpsession);
1479   if (rtpsession->priv->stop_thread)
1480     goto stopping;
1481
1482   if ((sync_src = rtpsession->sync_src)) {
1483     gst_object_ref (sync_src);
1484     GST_RTP_SESSION_UNLOCK (rtpsession);
1485
1486     /* set rtcp caps on output pad, this happens
1487      * when we receive RTCP muxed with RTP according
1488      * to RFC5761. Otherwise we would have forwarded
1489      * the events from the recv_rtcp_sink pad already
1490      */
1491     if (!gst_pad_has_current_caps (sync_src))
1492       do_rtcp_events (rtpsession, sync_src);
1493
1494     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1495     result = gst_pad_push (sync_src, buffer);
1496     gst_object_unref (sync_src);
1497   } else {
1498     GST_RTP_SESSION_UNLOCK (rtpsession);
1499
1500     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1501     gst_buffer_unref (buffer);
1502     result = GST_FLOW_OK;
1503   }
1504   return result;
1505
1506   /* ERRORS */
1507 stopping:
1508   {
1509     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1510     gst_buffer_unref (buffer);
1511     GST_RTP_SESSION_UNLOCK (rtpsession);
1512     return GST_FLOW_OK;
1513   }
1514 }
1515
1516 static void
1517 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1518 {
1519   GstRtpSessionPrivate *priv;
1520   const GstStructure *s;
1521   gint payload;
1522
1523   priv = rtpsession->priv;
1524
1525   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1526
1527   s = gst_caps_get_structure (caps, 0);
1528   if (!gst_structure_get_int (s, "payload", &payload))
1529     return;
1530
1531   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1532     return;
1533
1534   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1535       gst_caps_ref (caps));
1536 }
1537
1538 static GstCaps *
1539 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1540 {
1541   GstCaps *caps = NULL;
1542   GValue args[2] = { {0}, {0} };
1543   GValue ret = { 0 };
1544
1545   GST_RTP_SESSION_LOCK (rtpsession);
1546   caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1547       GINT_TO_POINTER (payload));
1548   if (caps) {
1549     gst_caps_ref (caps);
1550     goto done;
1551   }
1552
1553   /* not found in the cache, try to get it with a signal */
1554   g_value_init (&args[0], GST_TYPE_ELEMENT);
1555   g_value_set_object (&args[0], rtpsession);
1556   g_value_init (&args[1], G_TYPE_UINT);
1557   g_value_set_uint (&args[1], payload);
1558
1559   g_value_init (&ret, GST_TYPE_CAPS);
1560   g_value_set_boxed (&ret, NULL);
1561
1562   GST_RTP_SESSION_UNLOCK (rtpsession);
1563
1564   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1565       &ret);
1566
1567   GST_RTP_SESSION_LOCK (rtpsession);
1568
1569   g_value_unset (&args[0]);
1570   g_value_unset (&args[1]);
1571   caps = (GstCaps *) g_value_dup_boxed (&ret);
1572   g_value_unset (&ret);
1573   if (!caps)
1574     goto no_caps;
1575
1576   gst_rtp_session_cache_caps (rtpsession, caps);
1577
1578 done:
1579   GST_RTP_SESSION_UNLOCK (rtpsession);
1580
1581   return caps;
1582
1583 no_caps:
1584   {
1585     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1586     goto done;
1587   }
1588 }
1589
1590 /* called when the session manager needs the clock rate */
1591 static gint
1592 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1593     gpointer user_data)
1594 {
1595   gint result = -1;
1596   GstRtpSession *rtpsession;
1597   GstCaps *caps;
1598   const GstStructure *s;
1599
1600   rtpsession = GST_RTP_SESSION_CAST (user_data);
1601
1602   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1603
1604   if (!caps)
1605     goto done;
1606
1607   s = gst_caps_get_structure (caps, 0);
1608   if (!gst_structure_get_int (s, "clock-rate", &result))
1609     goto no_clock_rate;
1610
1611   gst_caps_unref (caps);
1612
1613   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1614
1615 done:
1616
1617   return result;
1618
1619   /* ERRORS */
1620 no_clock_rate:
1621   {
1622     gst_caps_unref (caps);
1623     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1624     goto done;
1625   }
1626 }
1627
1628 /* called when the session manager asks us to reconsider the timeout */
1629 static void
1630 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1631 {
1632   GstRtpSession *rtpsession;
1633
1634   rtpsession = GST_RTP_SESSION_CAST (user_data);
1635
1636   GST_RTP_SESSION_LOCK (rtpsession);
1637   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1638   if (rtpsession->priv->id)
1639     gst_clock_id_unschedule (rtpsession->priv->id);
1640   GST_RTP_SESSION_UNLOCK (rtpsession);
1641 }
1642
1643 static gboolean
1644 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
1645     GstEvent * event)
1646 {
1647   GstRtpSession *rtpsession;
1648   gboolean ret = FALSE;
1649
1650   rtpsession = GST_RTP_SESSION (parent);
1651
1652   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1653       GST_EVENT_TYPE_NAME (event));
1654
1655   switch (GST_EVENT_TYPE (event)) {
1656     case GST_EVENT_CAPS:
1657     {
1658       GstCaps *caps;
1659
1660       /* process */
1661       gst_event_parse_caps (event, &caps);
1662       gst_rtp_session_sink_setcaps (pad, rtpsession, caps);
1663       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1664       break;
1665     }
1666     case GST_EVENT_FLUSH_STOP:
1667       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1668       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1669       break;
1670     case GST_EVENT_SEGMENT:
1671     {
1672       GstSegment *segment, in_segment;
1673
1674       segment = &rtpsession->recv_rtp_seg;
1675
1676       /* the newsegment event is needed to convert the RTP timestamp to
1677        * running_time, which is needed to generate a mapping from RTP to NTP
1678        * timestamps in SR reports */
1679       gst_event_copy_segment (event, &in_segment);
1680       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
1681           &in_segment);
1682
1683       /* accept upstream */
1684       gst_segment_copy_into (&in_segment, segment);
1685
1686       /* push event forward */
1687       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1688       break;
1689     }
1690     case GST_EVENT_EOS:
1691     {
1692       GstPad *rtcp_src;
1693
1694       ret =
1695           gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
1696
1697       GST_RTP_SESSION_LOCK (rtpsession);
1698       if ((rtcp_src = rtpsession->send_rtcp_src))
1699         gst_object_ref (rtcp_src);
1700       GST_RTP_SESSION_UNLOCK (rtpsession);
1701
1702       if (rtcp_src) {
1703         ret = gst_pad_push_event (rtcp_src, event);
1704         gst_object_unref (rtcp_src);
1705       } else {
1706         gst_event_unref (event);
1707         ret = TRUE;
1708       }
1709       break;
1710     }
1711     default:
1712       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1713       break;
1714   }
1715
1716   return ret;
1717
1718 }
1719
1720 static gboolean
1721 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1722     guint32 ssrc, guint payload, gboolean all_headers, gint count)
1723 {
1724   GstCaps *caps;
1725
1726   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1727
1728   if (caps) {
1729     const GstStructure *s = gst_caps_get_structure (caps, 0);
1730     gboolean pli;
1731     gboolean fir;
1732
1733     pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1734     fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1735
1736     /* Google Talk uses FIR for repair, so send it even if we just want a
1737      * regular PLI */
1738     if (!pli &&
1739         gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1740       fir = TRUE;
1741
1742     gst_caps_unref (caps);
1743
1744     if (pli || fir)
1745       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1746           fir, count);
1747   }
1748
1749   return FALSE;
1750 }
1751
1752 static gboolean
1753 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
1754     GstEvent * event)
1755 {
1756   GstRtpSession *rtpsession;
1757   gboolean forward = TRUE;
1758   gboolean ret = TRUE;
1759   const GstStructure *s;
1760   guint32 ssrc;
1761   guint pt;
1762
1763   rtpsession = GST_RTP_SESSION (parent);
1764
1765   switch (GST_EVENT_TYPE (event)) {
1766     case GST_EVENT_CUSTOM_UPSTREAM:
1767       s = gst_event_get_structure (event);
1768       if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1769           gst_structure_get_uint (s, "ssrc", &ssrc) &&
1770           gst_structure_get_uint (s, "payload", &pt)) {
1771         gboolean all_headers = FALSE;
1772         gint count = -1;
1773
1774         gst_structure_get_boolean (s, "all-headers", &all_headers);
1775         if (gst_structure_get_int (s, "count", &count) && count < 0)
1776           count += G_MAXINT;    /* Make sure count is positive if present */
1777         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1778                 all_headers, count))
1779           forward = FALSE;
1780       } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
1781         GstClockTime running_time;
1782         guint seqnum, delay, deadline, max_delay, avg_rtt;
1783
1784         GST_RTP_SESSION_LOCK (rtpsession);
1785         rtpsession->priv->rtx_count++;
1786         GST_RTP_SESSION_UNLOCK (rtpsession);
1787
1788         if (!gst_structure_get_clock_time (s, "running-time", &running_time))
1789           running_time = -1;
1790         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
1791           ssrc = -1;
1792         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
1793           seqnum = -1;
1794         if (!gst_structure_get_uint (s, "delay", &delay))
1795           delay = 0;
1796         if (!gst_structure_get_uint (s, "deadline", &deadline))
1797           deadline = 100;
1798         if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
1799           avg_rtt = 40;
1800
1801         /* remaining time to receive the packet */
1802         max_delay = deadline;
1803         if (max_delay > delay)
1804           max_delay -= delay;
1805         /* estimated RTT */
1806         if (max_delay > avg_rtt)
1807           max_delay -= avg_rtt;
1808         else
1809           max_delay = 0;
1810
1811         if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
1812                 max_delay * GST_MSECOND))
1813           forward = FALSE;
1814       }
1815       break;
1816     default:
1817       break;
1818   }
1819
1820   if (forward) {
1821     GstPad *recv_rtp_sink;
1822
1823     GST_RTP_SESSION_LOCK (rtpsession);
1824     if ((recv_rtp_sink = rtpsession->recv_rtp_sink))
1825       gst_object_ref (recv_rtp_sink);
1826     GST_RTP_SESSION_UNLOCK (rtpsession);
1827
1828     if (recv_rtp_sink) {
1829       ret = gst_pad_push_event (recv_rtp_sink, event);
1830       gst_object_unref (recv_rtp_sink);
1831     } else
1832       gst_event_unref (event);
1833   } else {
1834     gst_event_unref (event);
1835   }
1836
1837   return ret;
1838 }
1839
1840
1841 static GstIterator *
1842 gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
1843 {
1844   GstRtpSession *rtpsession;
1845   GstPad *otherpad = NULL;
1846   GstIterator *it = NULL;
1847
1848   rtpsession = GST_RTP_SESSION (parent);
1849
1850   GST_RTP_SESSION_LOCK (rtpsession);
1851   if (pad == rtpsession->recv_rtp_src) {
1852     otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1853   } else if (pad == rtpsession->recv_rtp_sink) {
1854     otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1855   } else if (pad == rtpsession->send_rtp_src) {
1856     otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1857   } else if (pad == rtpsession->send_rtp_sink) {
1858     otherpad = gst_object_ref (rtpsession->send_rtp_src);
1859   }
1860   GST_RTP_SESSION_UNLOCK (rtpsession);
1861
1862   if (otherpad) {
1863     GValue val = { 0, };
1864
1865     g_value_init (&val, GST_TYPE_PAD);
1866     g_value_set_object (&val, otherpad);
1867     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1868     g_value_unset (&val);
1869     gst_object_unref (otherpad);
1870   } else {
1871     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1872   }
1873
1874   return it;
1875 }
1876
1877 static gboolean
1878 gst_rtp_session_sink_setcaps (GstPad * pad, GstRtpSession * rtpsession,
1879     GstCaps * caps)
1880 {
1881   GST_RTP_SESSION_LOCK (rtpsession);
1882   gst_rtp_session_cache_caps (rtpsession, caps);
1883   GST_RTP_SESSION_UNLOCK (rtpsession);
1884
1885   return TRUE;
1886 }
1887
1888 /* receive a packet from a sender, send it to the RTP session manager and
1889  * forward the packet on the rtp_src pad
1890  */
1891 static GstFlowReturn
1892 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
1893     GstBuffer * buffer)
1894 {
1895   GstRtpSession *rtpsession;
1896   GstRtpSessionPrivate *priv;
1897   GstFlowReturn ret;
1898   GstClockTime current_time, running_time;
1899   GstClockTime timestamp;
1900   guint64 ntpnstime;
1901
1902   rtpsession = GST_RTP_SESSION (parent);
1903   priv = rtpsession->priv;
1904
1905   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1906
1907   GST_RTP_SESSION_LOCK (rtpsession);
1908   signal_waiting_rtcp_thread_unlocked (rtpsession);
1909   GST_RTP_SESSION_UNLOCK (rtpsession);
1910
1911   /* get NTP time when this packet was captured, this depends on the timestamp. */
1912   timestamp = GST_BUFFER_PTS (buffer);
1913   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1914     /* convert to running time using the segment values */
1915     running_time =
1916         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1917         timestamp);
1918     ntpnstime = GST_CLOCK_TIME_NONE;
1919   } else {
1920     get_current_times (rtpsession, &running_time, &ntpnstime);
1921   }
1922   current_time = gst_clock_get_time (priv->sysclock);
1923
1924   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1925       running_time, ntpnstime);
1926   if (ret != GST_FLOW_OK)
1927     goto push_error;
1928
1929 done:
1930
1931   return ret;
1932
1933   /* ERRORS */
1934 push_error:
1935   {
1936     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1937         gst_flow_get_name (ret));
1938     goto done;
1939   }
1940 }
1941
1942 static gboolean
1943 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
1944     GstEvent * event)
1945 {
1946   GstRtpSession *rtpsession;
1947   gboolean ret = FALSE;
1948
1949   rtpsession = GST_RTP_SESSION (parent);
1950
1951   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1952       GST_EVENT_TYPE_NAME (event));
1953
1954   switch (GST_EVENT_TYPE (event)) {
1955     case GST_EVENT_SEGMENT:
1956       /* Make sure that the sync_src pad has caps before the segment event.
1957        * Otherwise we might get a segment event before caps from the receive
1958        * RTCP pad, and then later when receiving RTCP packets will set caps.
1959        * This will results in a sticky event misordering warning
1960        */
1961       if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
1962         GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
1963         gst_pad_set_caps (rtpsession->sync_src, caps);
1964         gst_caps_unref (caps);
1965       }
1966       /* fall through */
1967     default:
1968       ret = gst_pad_push_event (rtpsession->sync_src, event);
1969       break;
1970   }
1971
1972   return ret;
1973 }
1974
1975 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1976  * forward the SR packets to the sync_src pad.
1977  */
1978 static GstFlowReturn
1979 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
1980     GstBuffer * buffer)
1981 {
1982   GstRtpSession *rtpsession;
1983   GstRtpSessionPrivate *priv;
1984   GstClockTime current_time;
1985   guint64 ntpnstime;
1986
1987   rtpsession = GST_RTP_SESSION (parent);
1988   priv = rtpsession->priv;
1989
1990   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
1991
1992   GST_RTP_SESSION_LOCK (rtpsession);
1993   signal_waiting_rtcp_thread_unlocked (rtpsession);
1994   GST_RTP_SESSION_UNLOCK (rtpsession);
1995
1996   current_time = gst_clock_get_time (priv->sysclock);
1997   get_current_times (rtpsession, NULL, &ntpnstime);
1998
1999   rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
2000
2001   return GST_FLOW_OK;           /* always return OK */
2002 }
2003
2004 static gboolean
2005 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent,
2006     GstQuery * query)
2007 {
2008   GstRtpSession *rtpsession;
2009   gboolean ret = FALSE;
2010
2011   rtpsession = GST_RTP_SESSION (parent);
2012
2013   GST_DEBUG_OBJECT (rtpsession, "received QUERY %s",
2014       GST_QUERY_TYPE_NAME (query));
2015
2016   switch (GST_QUERY_TYPE (query)) {
2017     case GST_QUERY_LATENCY:
2018       ret = TRUE;
2019       /* use the defaults for the latency query. */
2020       gst_query_set_latency (query, FALSE, 0, -1);
2021       break;
2022     default:
2023       /* other queries simply fail for now */
2024       break;
2025   }
2026
2027   return ret;
2028 }
2029
2030 static gboolean
2031 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent,
2032     GstEvent * event)
2033 {
2034   GstRtpSession *rtpsession;
2035   gboolean ret = TRUE;
2036
2037   rtpsession = GST_RTP_SESSION (parent);
2038   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2039       GST_EVENT_TYPE_NAME (event));
2040
2041   switch (GST_EVENT_TYPE (event)) {
2042     case GST_EVENT_SEEK:
2043     case GST_EVENT_LATENCY:
2044       gst_event_unref (event);
2045       ret = TRUE;
2046       break;
2047     default:
2048       /* other events simply fail for now */
2049       gst_event_unref (event);
2050       ret = FALSE;
2051       break;
2052   }
2053
2054   return ret;
2055 }
2056
2057
2058 static gboolean
2059 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
2060     GstEvent * event)
2061 {
2062   GstRtpSession *rtpsession;
2063   gboolean ret = FALSE;
2064
2065   rtpsession = GST_RTP_SESSION (parent);
2066
2067   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2068       GST_EVENT_TYPE_NAME (event));
2069
2070   switch (GST_EVENT_TYPE (event)) {
2071     case GST_EVENT_CAPS:
2072     {
2073       GstCaps *caps;
2074
2075       /* process */
2076       gst_event_parse_caps (event, &caps);
2077       gst_rtp_session_setcaps_send_rtp (pad, rtpsession, caps);
2078       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2079       break;
2080     }
2081     case GST_EVENT_FLUSH_STOP:
2082       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
2083       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2084       break;
2085     case GST_EVENT_SEGMENT:{
2086       GstSegment *segment, in_segment;
2087
2088       segment = &rtpsession->send_rtp_seg;
2089
2090       /* the newsegment event is needed to convert the RTP timestamp to
2091        * running_time, which is needed to generate a mapping from RTP to NTP
2092        * timestamps in SR reports */
2093       gst_event_copy_segment (event, &in_segment);
2094       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
2095           &in_segment);
2096
2097       /* accept upstream */
2098       gst_segment_copy_into (&in_segment, segment);
2099
2100       /* push event forward */
2101       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2102       break;
2103     }
2104     case GST_EVENT_EOS:{
2105       GstClockTime current_time;
2106
2107       /* push downstream FIXME, we are not supposed to leave the session just
2108        * because we stop sending. */
2109       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2110       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
2111
2112       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
2113       rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
2114       rtp_session_schedule_bye (rtpsession->priv->session, current_time);
2115       break;
2116     }
2117     default:{
2118       GstPad *send_rtp_src;
2119
2120       GST_RTP_SESSION_LOCK (rtpsession);
2121       if ((send_rtp_src = rtpsession->send_rtp_src))
2122         gst_object_ref (send_rtp_src);
2123       GST_RTP_SESSION_UNLOCK (rtpsession);
2124
2125       if (send_rtp_src) {
2126         ret = gst_pad_push_event (send_rtp_src, event);
2127         gst_object_unref (send_rtp_src);
2128       } else
2129         gst_event_unref (event);
2130
2131       break;
2132     }
2133   }
2134
2135   return ret;
2136 }
2137
2138 static gboolean
2139 gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent,
2140     GstEvent * event)
2141 {
2142   GstRtpSession *rtpsession;
2143   gboolean ret = FALSE;
2144
2145   rtpsession = GST_RTP_SESSION (parent);
2146
2147   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2148       GST_EVENT_TYPE_NAME (event));
2149
2150   switch (GST_EVENT_TYPE (event)) {
2151     case GST_EVENT_LATENCY:
2152       /* save the latency, we need this to know when an RTP packet will be
2153        * rendered by the sink */
2154       gst_event_parse_latency (event, &rtpsession->priv->send_latency);
2155
2156       ret = gst_pad_event_default (pad, parent, event);
2157       break;
2158     default:
2159       ret = gst_pad_event_default (pad, parent, event);
2160       break;
2161   }
2162   return ret;
2163 }
2164
2165 static GstCaps *
2166 gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2167     GstCaps * filter)
2168 {
2169   GstRtpSessionPrivate *priv;
2170   GstCaps *result;
2171   GstStructure *s1, *s2;
2172   guint ssrc;
2173   gboolean is_random;
2174
2175   priv = rtpsession->priv;
2176
2177   ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
2178
2179   /* we can basically accept anything but we prefer to receive packets with our
2180    * internal SSRC so that we don't have to patch it. Create a structure with
2181    * the SSRC and another one without.
2182    * Only do this if the session actually decided on an ssrc already,
2183    * otherwise we give upstream the opportunity to select an ssrc itself */
2184   if (!is_random) {
2185     s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
2186         NULL);
2187     s2 = gst_structure_new_empty ("application/x-rtp");
2188
2189     result = gst_caps_new_full (s1, s2, NULL);
2190   } else {
2191     result = gst_caps_new_empty_simple ("application/x-rtp");
2192   }
2193
2194   if (filter) {
2195     GstCaps *caps = result;
2196
2197     result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
2198     gst_caps_unref (caps);
2199   }
2200
2201   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
2202
2203   return result;
2204 }
2205
2206 static gboolean
2207 gst_rtp_session_query_send_rtp (GstPad * pad, GstObject * parent,
2208     GstQuery * query)
2209 {
2210   gboolean res = FALSE;
2211   GstRtpSession *rtpsession;
2212
2213   rtpsession = GST_RTP_SESSION (parent);
2214
2215   switch (GST_QUERY_TYPE (query)) {
2216     case GST_QUERY_CAPS:
2217     {
2218       GstCaps *filter, *caps;
2219
2220       gst_query_parse_caps (query, &filter);
2221       caps = gst_rtp_session_getcaps_send_rtp (pad, rtpsession, filter);
2222       gst_query_set_caps_result (query, caps);
2223       gst_caps_unref (caps);
2224       res = TRUE;
2225       break;
2226     }
2227     default:
2228       res = gst_pad_query_default (pad, parent, query);
2229       break;
2230   }
2231
2232   return res;
2233 }
2234
2235 static gboolean
2236 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2237     GstCaps * caps)
2238 {
2239   GstRtpSessionPrivate *priv;
2240
2241   priv = rtpsession->priv;
2242
2243   rtp_session_update_send_caps (priv->session, caps);
2244
2245   return TRUE;
2246 }
2247
2248 /* Recieve an RTP packet or a list of packets to be send to the receivers,
2249  * send to RTP session manager and forward to send_rtp_src.
2250  */
2251 static GstFlowReturn
2252 gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
2253     gpointer data, gboolean is_list)
2254 {
2255   GstRtpSessionPrivate *priv;
2256   GstFlowReturn ret;
2257   GstClockTime timestamp, running_time;
2258   GstClockTime current_time;
2259
2260   priv = rtpsession->priv;
2261
2262   GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
2263
2264   /* get NTP time when this packet was captured, this depends on the timestamp. */
2265   if (is_list) {
2266     GstBuffer *buffer = NULL;
2267
2268     /* All groups in an list have the same timestamp.
2269      * So, just take it from the first group. */
2270     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
2271     if (buffer)
2272       timestamp = GST_BUFFER_PTS (buffer);
2273     else
2274       timestamp = -1;
2275   } else {
2276     timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
2277   }
2278
2279   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2280     /* convert to running time using the segment start value. */
2281     running_time =
2282         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
2283         timestamp);
2284     if (priv->rtcp_sync_send_time)
2285       running_time += priv->send_latency;
2286   } else {
2287     /* no timestamp. */
2288     running_time = -1;
2289   }
2290
2291   current_time = gst_clock_get_time (priv->sysclock);
2292   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
2293       running_time);
2294   if (ret != GST_FLOW_OK)
2295     goto push_error;
2296
2297 done:
2298
2299   return ret;
2300
2301   /* ERRORS */
2302 push_error:
2303   {
2304     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2305         gst_flow_get_name (ret));
2306     goto done;
2307   }
2308 }
2309
2310 static GstFlowReturn
2311 gst_rtp_session_chain_send_rtp (GstPad * pad, GstObject * parent,
2312     GstBuffer * buffer)
2313 {
2314   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2315
2316   return gst_rtp_session_chain_send_rtp_common (rtpsession, buffer, FALSE);
2317 }
2318
2319 static GstFlowReturn
2320 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstObject * parent,
2321     GstBufferList * list)
2322 {
2323   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2324
2325   return gst_rtp_session_chain_send_rtp_common (rtpsession, list, TRUE);
2326 }
2327
2328 /* Create sinkpad to receive RTP packets from senders. This will also create a
2329  * srcpad for the RTP packets.
2330  */
2331 static GstPad *
2332 create_recv_rtp_sink (GstRtpSession * rtpsession)
2333 {
2334   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
2335
2336   rtpsession->recv_rtp_sink =
2337       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
2338       "recv_rtp_sink");
2339   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
2340       gst_rtp_session_chain_recv_rtp);
2341   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
2342       gst_rtp_session_event_recv_rtp_sink);
2343   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
2344       gst_rtp_session_iterate_internal_links);
2345   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
2346   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
2347   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2348       rtpsession->recv_rtp_sink);
2349
2350   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
2351   rtpsession->recv_rtp_src =
2352       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
2353       "recv_rtp_src");
2354   gst_pad_set_event_function (rtpsession->recv_rtp_src,
2355       gst_rtp_session_event_recv_rtp_src);
2356   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
2357       gst_rtp_session_iterate_internal_links);
2358   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
2359   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
2360   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
2361
2362   return rtpsession->recv_rtp_sink;
2363 }
2364
2365 /* Remove sinkpad to receive RTP packets from senders. This will also remove
2366  * the srcpad for the RTP packets.
2367  */
2368 static void
2369 remove_recv_rtp_sink (GstRtpSession * rtpsession)
2370 {
2371   GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
2372
2373   /* deactivate from source to sink */
2374   gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
2375   gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
2376
2377   /* remove pads */
2378   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2379       rtpsession->recv_rtp_sink);
2380   rtpsession->recv_rtp_sink = NULL;
2381
2382   GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
2383   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2384       rtpsession->recv_rtp_src);
2385   rtpsession->recv_rtp_src = NULL;
2386 }
2387
2388 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
2389  * sync_src pad for the SR packets.
2390  */
2391 static GstPad *
2392 create_recv_rtcp_sink (GstRtpSession * rtpsession)
2393 {
2394   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
2395
2396   rtpsession->recv_rtcp_sink =
2397       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
2398       "recv_rtcp_sink");
2399   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
2400       gst_rtp_session_chain_recv_rtcp);
2401   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
2402       gst_rtp_session_event_recv_rtcp_sink);
2403   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
2404       gst_rtp_session_iterate_internal_links);
2405   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
2406   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2407       rtpsession->recv_rtcp_sink);
2408
2409   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
2410   rtpsession->sync_src =
2411       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
2412       "sync_src");
2413   gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
2414       gst_rtp_session_iterate_internal_links);
2415   gst_pad_use_fixed_caps (rtpsession->sync_src);
2416   gst_pad_set_active (rtpsession->sync_src, TRUE);
2417   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2418
2419   return rtpsession->recv_rtcp_sink;
2420 }
2421
2422 static void
2423 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
2424 {
2425   GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
2426
2427   gst_pad_set_active (rtpsession->sync_src, FALSE);
2428   gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
2429
2430   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2431       rtpsession->recv_rtcp_sink);
2432   rtpsession->recv_rtcp_sink = NULL;
2433
2434   GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
2435   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2436   rtpsession->sync_src = NULL;
2437 }
2438
2439 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
2440  * send_rtp_src pad.
2441  */
2442 static GstPad *
2443 create_send_rtp_sink (GstRtpSession * rtpsession)
2444 {
2445   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2446
2447   rtpsession->send_rtp_sink =
2448       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2449       "send_rtp_sink");
2450   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2451       gst_rtp_session_chain_send_rtp);
2452   gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2453       gst_rtp_session_chain_send_rtp_list);
2454   gst_pad_set_query_function (rtpsession->send_rtp_sink,
2455       gst_rtp_session_query_send_rtp);
2456   gst_pad_set_event_function (rtpsession->send_rtp_sink,
2457       gst_rtp_session_event_send_rtp_sink);
2458   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2459       gst_rtp_session_iterate_internal_links);
2460   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
2461   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
2462   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2463   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2464       rtpsession->send_rtp_sink);
2465
2466   rtpsession->send_rtp_src =
2467       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2468       "send_rtp_src");
2469   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2470       gst_rtp_session_iterate_internal_links);
2471   gst_pad_set_event_function (rtpsession->send_rtp_src,
2472       gst_rtp_session_event_send_rtp_src);
2473   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
2474   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2475   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2476
2477   return rtpsession->send_rtp_sink;
2478 }
2479
2480 static void
2481 remove_send_rtp_sink (GstRtpSession * rtpsession)
2482 {
2483   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2484
2485   gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2486   gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2487
2488   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2489       rtpsession->send_rtp_sink);
2490   rtpsession->send_rtp_sink = NULL;
2491
2492   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2493       rtpsession->send_rtp_src);
2494   rtpsession->send_rtp_src = NULL;
2495 }
2496
2497 /* Create a srcpad with the RTCP packets to send out.
2498  * This pad will be driven by the RTP session manager when it wants to send out
2499  * RTCP packets.
2500  */
2501 static GstPad *
2502 create_send_rtcp_src (GstRtpSession * rtpsession)
2503 {
2504   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2505
2506   rtpsession->send_rtcp_src =
2507       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2508       "send_rtcp_src");
2509   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2510   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2511   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2512       gst_rtp_session_iterate_internal_links);
2513   gst_pad_set_query_function (rtpsession->send_rtcp_src,
2514       gst_rtp_session_query_send_rtcp_src);
2515   gst_pad_set_event_function (rtpsession->send_rtcp_src,
2516       gst_rtp_session_event_send_rtcp_src);
2517   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2518       rtpsession->send_rtcp_src);
2519
2520   return rtpsession->send_rtcp_src;
2521 }
2522
2523 static void
2524 remove_send_rtcp_src (GstRtpSession * rtpsession)
2525 {
2526   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2527
2528   gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2529
2530   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2531       rtpsession->send_rtcp_src);
2532   rtpsession->send_rtcp_src = NULL;
2533 }
2534
2535 static GstPad *
2536 gst_rtp_session_request_new_pad (GstElement * element,
2537     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2538 {
2539   GstRtpSession *rtpsession;
2540   GstElementClass *klass;
2541   GstPad *result;
2542
2543   g_return_val_if_fail (templ != NULL, NULL);
2544   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2545
2546   rtpsession = GST_RTP_SESSION (element);
2547   klass = GST_ELEMENT_GET_CLASS (element);
2548
2549   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2550
2551   GST_RTP_SESSION_LOCK (rtpsession);
2552
2553   /* figure out the template */
2554   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2555     if (rtpsession->recv_rtp_sink != NULL)
2556       goto exists;
2557
2558     result = create_recv_rtp_sink (rtpsession);
2559   } else if (templ == gst_element_class_get_pad_template (klass,
2560           "recv_rtcp_sink")) {
2561     if (rtpsession->recv_rtcp_sink != NULL)
2562       goto exists;
2563
2564     result = create_recv_rtcp_sink (rtpsession);
2565   } else if (templ == gst_element_class_get_pad_template (klass,
2566           "send_rtp_sink")) {
2567     if (rtpsession->send_rtp_sink != NULL)
2568       goto exists;
2569
2570     result = create_send_rtp_sink (rtpsession);
2571   } else if (templ == gst_element_class_get_pad_template (klass,
2572           "send_rtcp_src")) {
2573     if (rtpsession->send_rtcp_src != NULL)
2574       goto exists;
2575
2576     result = create_send_rtcp_src (rtpsession);
2577   } else
2578     goto wrong_template;
2579
2580   GST_RTP_SESSION_UNLOCK (rtpsession);
2581
2582   return result;
2583
2584   /* ERRORS */
2585 wrong_template:
2586   {
2587     GST_RTP_SESSION_UNLOCK (rtpsession);
2588     g_warning ("rtpsession: this is not our template");
2589     return NULL;
2590   }
2591 exists:
2592   {
2593     GST_RTP_SESSION_UNLOCK (rtpsession);
2594     g_warning ("rtpsession: pad already requested");
2595     return NULL;
2596   }
2597 }
2598
2599 static void
2600 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2601 {
2602   GstRtpSession *rtpsession;
2603
2604   g_return_if_fail (GST_IS_RTP_SESSION (element));
2605   g_return_if_fail (GST_IS_PAD (pad));
2606
2607   rtpsession = GST_RTP_SESSION (element);
2608
2609   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2610
2611   GST_RTP_SESSION_LOCK (rtpsession);
2612
2613   if (rtpsession->recv_rtp_sink == pad) {
2614     remove_recv_rtp_sink (rtpsession);
2615   } else if (rtpsession->recv_rtcp_sink == pad) {
2616     remove_recv_rtcp_sink (rtpsession);
2617   } else if (rtpsession->send_rtp_sink == pad) {
2618     remove_send_rtp_sink (rtpsession);
2619   } else if (rtpsession->send_rtcp_src == pad) {
2620     remove_send_rtcp_src (rtpsession);
2621   } else
2622     goto wrong_pad;
2623
2624   GST_RTP_SESSION_UNLOCK (rtpsession);
2625
2626   return;
2627
2628   /* ERRORS */
2629 wrong_pad:
2630   {
2631     GST_RTP_SESSION_UNLOCK (rtpsession);
2632     g_warning ("rtpsession: asked to release an unknown pad");
2633     return;
2634   }
2635 }
2636
2637 static void
2638 gst_rtp_session_request_key_unit (RTPSession * sess,
2639     guint32 ssrc, gboolean all_headers, gpointer user_data)
2640 {
2641   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2642   GstEvent *event;
2643   GstPad *send_rtp_sink;
2644
2645   GST_RTP_SESSION_LOCK (rtpsession);
2646   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2647     gst_object_ref (send_rtp_sink);
2648   GST_RTP_SESSION_UNLOCK (rtpsession);
2649
2650   if (send_rtp_sink) {
2651     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2652         gst_structure_new ("GstForceKeyUnit", "ssrc", G_TYPE_UINT, ssrc,
2653             "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2654     gst_pad_push_event (send_rtp_sink, event);
2655     gst_object_unref (send_rtp_sink);
2656   }
2657 }
2658
2659 static GstClockTime
2660 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2661 {
2662   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2663
2664   return gst_clock_get_time (rtpsession->priv->sysclock);
2665 }
2666
2667 static void
2668 gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
2669     guint16 blp, guint32 ssrc, gpointer user_data)
2670 {
2671   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2672   GstEvent *event;
2673   GstPad *send_rtp_sink;
2674
2675   GST_RTP_SESSION_LOCK (rtpsession);
2676   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2677     gst_object_ref (send_rtp_sink);
2678   GST_RTP_SESSION_UNLOCK (rtpsession);
2679
2680   if (send_rtp_sink) {
2681     while (TRUE) {
2682       event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2683           gst_structure_new ("GstRTPRetransmissionRequest",
2684               "seqnum", G_TYPE_UINT, (guint) seqnum,
2685               "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
2686       gst_pad_push_event (send_rtp_sink, event);
2687
2688       if (blp == 0)
2689         break;
2690
2691       seqnum++;
2692       while ((blp & 1) == 0) {
2693         seqnum++;
2694         blp >>= 1;
2695       }
2696       blp >>= 1;
2697     }
2698     gst_object_unref (send_rtp_sink);
2699   }
2700 }
2701
2702 static void
2703 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
2704 {
2705   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2706   GstPad *send_rtp_sink;
2707
2708   GST_RTP_SESSION_LOCK (rtpsession);
2709   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2710     gst_object_ref (send_rtp_sink);
2711   GST_RTP_SESSION_UNLOCK (rtpsession);
2712
2713   if (send_rtp_sink) {
2714     gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
2715     gst_object_unref (send_rtp_sink);
2716   }
2717 }
2718
2719 static void
2720 gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data)
2721 {
2722   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2723
2724   GST_DEBUG_OBJECT (rtpsession, "Notified of early RTCP");
2725   /* with an early RTCP request, we might have to start the RTCP thread */
2726   GST_RTP_SESSION_LOCK (rtpsession);
2727   signal_waiting_rtcp_thread_unlocked (rtpsession);
2728   GST_RTP_SESSION_UNLOCK (rtpsession);
2729 }