tizen 2.0 init
[framework/multimedia/gst-plugins-good0.10.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpsession
22  * @see_also: gstrtpjitterbuffer, gstrtpbin, gstrtpptdemux, gstrtpssrcdemux
23  *
24  * The RTP session manager models one participant with a unique SSRC in an RTP
25  * session. This session can be used to send and receive RTP and RTCP packets.
26  * Based on what REQUEST pads are requested from the session manager, specific
27  * functionality can be activated.
28  *
29  * The session manager currently implements RFC 3550 including:
30  * <itemizedlist>
31  *   <listitem>
32  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
33  *   </listitem>
34  *   <listitem>
35  *     <para>Maintainance of the SSRC participant database.</para>
36  *   </listitem>
37  *   <listitem>
38  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
39  *   </listitem>
40  *   <listitem>
41  *     <para>Scheduling of RR/SR RTCP packets.</para>
42  *   </listitem>
43  * </itemizedlist>
44  *
45  * The gstrtpsession will not demux packets based on SSRC or payload type, nor will
46  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
47  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
48  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
49  * combines all these features in one element.
50  *
51  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
52  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
53  * will be processed in the session and after being validated forwarded on the
54  * recv_rtp_src pad.
55  *
56  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
57  * which will automatically create a sync_src pad. Packets received on the RTCP
58  * pad will be used by the session manager to update the stats and database of
59  * the other participants. SR packets will be forwarded on the sync_src pad
60  * so that they can be used to perform inter-stream synchronisation when needed.
61  *
62  * If you want the session manager to generate and send RTCP packets, request
63  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
64  * that should be sent to all participants in the session.
65  *
66  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
67  * automatically create a send_rtp_src pad. The session manager will modify the
68  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
69  * send_rtp_src pad after updating its internal state.
70  *
71  * The session manager needs the clock-rate of the payload types it is handling
72  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
73  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
74  * signal.
75  *
76  * <refsect2>
77  * <title>Example pipelines</title>
78  * |[
79  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
80  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
81  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
82  * configured based on some negotiation process such as RTSP for this pipeline
83  * to work correctly.
84  * |[
85  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession name=session \
86  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
87  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
88  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
89  * decoder and display. Receive RTCP packets from port 5001 and process them in
90  * the session manager.
91  * Note that the application/x-rtp caps on udpsrc should be
92  * configured based on some negotiation process such as RTSP for this pipeline
93  * to work correctly.
94  * |[
95  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession .send_rtp_src ! udpsink port=5000
96  * ]| Send theora RTP packets through the session manager and out on UDP port
97  * 5000.
98  * |[
99  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession name=session .send_rtp_src \
100  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
101  * ]| Send theora RTP packets through the session manager and out on UDP port
102  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
103  * correctly because the second udpsink will not preroll correctly (no RTCP
104  * packets are sent in the PAUSED state). Applications should manually set and
105  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
106  * </refsect2>
107  *
108  * Last reviewed on 2007-05-28 (0.10.5)
109  */
110
111 #ifdef HAVE_CONFIG_H
112 #include "config.h"
113 #endif
114
115 #include <gst/rtp/gstrtpbuffer.h>
116
117 #include <gst/glib-compat-private.h>
118
119 #include "gstrtpbin-marshal.h"
120 #include "gstrtpsession.h"
121 #include "rtpsession.h"
122
123 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
124 #define GST_CAT_DEFAULT gst_rtp_session_debug
125
126 /* sink pads */
127 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
128 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
129     GST_PAD_SINK,
130     GST_PAD_REQUEST,
131     GST_STATIC_CAPS ("application/x-rtp")
132     );
133
134 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
135 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
136     GST_PAD_SINK,
137     GST_PAD_REQUEST,
138     GST_STATIC_CAPS ("application/x-rtcp")
139     );
140
141 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
142 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
143     GST_PAD_SINK,
144     GST_PAD_REQUEST,
145     GST_STATIC_CAPS ("application/x-rtp")
146     );
147
148 /* src pads */
149 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
150 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
151     GST_PAD_SRC,
152     GST_PAD_SOMETIMES,
153     GST_STATIC_CAPS ("application/x-rtp")
154     );
155
156 static GstStaticPadTemplate rtpsession_sync_src_template =
157 GST_STATIC_PAD_TEMPLATE ("sync_src",
158     GST_PAD_SRC,
159     GST_PAD_SOMETIMES,
160     GST_STATIC_CAPS ("application/x-rtcp")
161     );
162
163 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
164 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
165     GST_PAD_SRC,
166     GST_PAD_SOMETIMES,
167     GST_STATIC_CAPS ("application/x-rtp")
168     );
169
170 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
171 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
172     GST_PAD_SRC,
173     GST_PAD_REQUEST,
174     GST_STATIC_CAPS ("application/x-rtcp")
175     );
176
177 /* signals and args */
178 enum
179 {
180   SIGNAL_REQUEST_PT_MAP,
181   SIGNAL_CLEAR_PT_MAP,
182
183   SIGNAL_ON_NEW_SSRC,
184   SIGNAL_ON_SSRC_COLLISION,
185   SIGNAL_ON_SSRC_VALIDATED,
186   SIGNAL_ON_SSRC_ACTIVE,
187   SIGNAL_ON_SSRC_SDES,
188   SIGNAL_ON_BYE_SSRC,
189   SIGNAL_ON_BYE_TIMEOUT,
190   SIGNAL_ON_TIMEOUT,
191   SIGNAL_ON_SENDER_TIMEOUT,
192   LAST_SIGNAL
193 };
194
195 #define DEFAULT_NTP_NS_BASE          0
196 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
197 #define DEFAULT_RTCP_FRACTION        (RTP_STATS_BANDWIDTH * RTP_STATS_RTCP_FRACTION)
198 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
199 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
200 #define DEFAULT_SDES                 NULL
201 #define DEFAULT_NUM_SOURCES          0
202 #define DEFAULT_NUM_ACTIVE_SOURCES   0
203 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
204 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
205
206 enum
207 {
208   PROP_0,
209   PROP_NTP_NS_BASE,
210   PROP_BANDWIDTH,
211   PROP_RTCP_FRACTION,
212   PROP_RTCP_RR_BANDWIDTH,
213   PROP_RTCP_RS_BANDWIDTH,
214   PROP_SDES,
215   PROP_NUM_SOURCES,
216   PROP_NUM_ACTIVE_SOURCES,
217   PROP_INTERNAL_SESSION,
218   PROP_USE_PIPELINE_CLOCK,
219   PROP_RTCP_MIN_INTERVAL,
220   PROP_LAST
221 };
222
223 #define GST_RTP_SESSION_GET_PRIVATE(obj)  \
224            (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
225
226 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->priv->lock)
227 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->priv->lock)
228
229 struct _GstRtpSessionPrivate
230 {
231   GMutex *lock;
232   GstClock *sysclock;
233
234   RTPSession *session;
235
236   /* thread for sending out RTCP */
237   GstClockID id;
238   gboolean stop_thread;
239   GThread *thread;
240   gboolean thread_stopped;
241
242   /* caps mapping */
243   GHashTable *ptmap;
244
245   /* NTP base time */
246   guint64 ntpnsbase;
247   gboolean use_pipeline_clock;
248 };
249
250 /* callbacks to handle actions from the session manager */
251 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
252     RTPSource * src, GstBuffer * buffer, gpointer user_data);
253 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
254     RTPSource * src, gpointer data, gpointer user_data);
255 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
256     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
257 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
258     RTPSource * src, GstBuffer * buffer, gpointer user_data);
259 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
260     gpointer user_data);
261 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
262 static void gst_rtp_session_request_key_unit (RTPSession * sess,
263     gboolean all_headers, gpointer user_data);
264 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
265     gpointer user_data);
266
267 static RTPSessionCallbacks callbacks = {
268   gst_rtp_session_process_rtp,
269   gst_rtp_session_send_rtp,
270   gst_rtp_session_sync_rtcp,
271   gst_rtp_session_send_rtcp,
272   gst_rtp_session_clock_rate,
273   gst_rtp_session_reconsider,
274   gst_rtp_session_request_key_unit,
275   gst_rtp_session_request_time
276 };
277
278 /* GObject vmethods */
279 static void gst_rtp_session_finalize (GObject * object);
280 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
281     const GValue * value, GParamSpec * pspec);
282 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
283     GValue * value, GParamSpec * pspec);
284
285 /* GstElement vmethods */
286 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
287     GstStateChange transition);
288 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
289     GstPadTemplate * templ, const gchar * name);
290 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
291
292 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
293
294 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
295
296 static void
297 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
298 {
299   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
300       src->ssrc);
301 }
302
303 static void
304 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
305 {
306   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
307       src->ssrc);
308 }
309
310 static void
311 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
312 {
313   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
314       src->ssrc);
315 }
316
317 static void
318 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
319 {
320   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
321       src->ssrc);
322 }
323
324 static void
325 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
326 {
327   GstStructure *s;
328   GstMessage *m;
329
330   /* convert the new SDES info into a message */
331   RTP_SESSION_LOCK (session);
332   g_object_get (src, "sdes", &s, NULL);
333   RTP_SESSION_UNLOCK (session);
334
335   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
336   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
337
338   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
339       src->ssrc);
340 }
341
342 static void
343 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
344 {
345   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
346       src->ssrc);
347 }
348
349 static void
350 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
351 {
352   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
353       src->ssrc);
354 }
355
356 static void
357 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
358 {
359   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
360       src->ssrc);
361 }
362
363 static void
364 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
365 {
366   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
367       src->ssrc);
368 }
369
370 GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
371
372 static void
373 gst_rtp_session_base_init (gpointer klass)
374 {
375   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
376
377   /* sink pads */
378   gst_element_class_add_static_pad_template (element_class,
379       &rtpsession_recv_rtp_sink_template);
380   gst_element_class_add_static_pad_template (element_class,
381       &rtpsession_recv_rtcp_sink_template);
382   gst_element_class_add_static_pad_template (element_class,
383       &rtpsession_send_rtp_sink_template);
384
385   /* src pads */
386   gst_element_class_add_static_pad_template (element_class,
387       &rtpsession_recv_rtp_src_template);
388   gst_element_class_add_static_pad_template (element_class,
389       &rtpsession_sync_src_template);
390   gst_element_class_add_static_pad_template (element_class,
391       &rtpsession_send_rtp_src_template);
392   gst_element_class_add_static_pad_template (element_class,
393       &rtpsession_send_rtcp_src_template);
394
395   gst_element_class_set_details_simple (element_class, "RTP Session",
396       "Filter/Network/RTP",
397       "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
398 }
399
400 static void
401 gst_rtp_session_class_init (GstRtpSessionClass * klass)
402 {
403   GObjectClass *gobject_class;
404   GstElementClass *gstelement_class;
405
406   gobject_class = (GObjectClass *) klass;
407   gstelement_class = (GstElementClass *) klass;
408
409   g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
410
411   gobject_class->finalize = gst_rtp_session_finalize;
412   gobject_class->set_property = gst_rtp_session_set_property;
413   gobject_class->get_property = gst_rtp_session_get_property;
414
415   /**
416    * GstRtpSession::request-pt-map:
417    * @sess: the object which received the signal
418    * @pt: the pt
419    *
420    * Request the payload type as #GstCaps for @pt.
421    */
422   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
423       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
424       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
425       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
426       G_TYPE_UINT);
427   /**
428    * GstRtpSession::clear-pt-map:
429    * @sess: the object which received the signal
430    *
431    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
432    */
433   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
434       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
435       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
436       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
437
438   /**
439    * GstRtpSession::on-new-ssrc:
440    * @sess: the object which received the signal
441    * @ssrc: the SSRC
442    *
443    * Notify of a new SSRC that entered @session.
444    */
445   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
446       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
447       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
448       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
449   /**
450    * GstRtpSession::on-ssrc_collision:
451    * @sess: the object which received the signal
452    * @ssrc: the SSRC
453    *
454    * Notify when we have an SSRC collision
455    */
456   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
457       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
458       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
459           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
460       G_TYPE_NONE, 1, G_TYPE_UINT);
461   /**
462    * GstRtpSession::on-ssrc_validated:
463    * @sess: the object which received the signal
464    * @ssrc: the SSRC
465    *
466    * Notify of a new SSRC that became validated.
467    */
468   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
469       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
470       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
471           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
472       G_TYPE_NONE, 1, G_TYPE_UINT);
473   /**
474    * GstRtpSession::on-ssrc_active:
475    * @sess: the object which received the signal
476    * @ssrc: the SSRC
477    *
478    * Notify of a SSRC that is active, i.e., sending RTCP.
479    */
480   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
481       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
482       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
483           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
484       G_TYPE_NONE, 1, G_TYPE_UINT);
485   /**
486    * GstRtpSession::on-ssrc-sdes:
487    * @session: the object which received the signal
488    * @src: the SSRC
489    *
490    * Notify that a new SDES was received for SSRC.
491    */
492   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
493       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
494       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
495       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
496
497   /**
498    * GstRtpSession::on-bye-ssrc:
499    * @sess: the object which received the signal
500    * @ssrc: the SSRC
501    *
502    * Notify of an SSRC that became inactive because of a BYE packet.
503    */
504   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
505       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
506       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
507       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
508   /**
509    * GstRtpSession::on-bye-timeout:
510    * @sess: the object which received the signal
511    * @ssrc: the SSRC
512    *
513    * Notify of an SSRC that has timed out because of BYE
514    */
515   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
516       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
517       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
518       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
519   /**
520    * GstRtpSession::on-timeout:
521    * @sess: the object which received the signal
522    * @ssrc: the SSRC
523    *
524    * Notify of an SSRC that has timed out
525    */
526   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
527       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
528       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
529       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
530   /**
531    * GstRtpSession::on-sender-timeout:
532    * @sess: the object which received the signal
533    * @ssrc: the SSRC
534    *
535    * Notify of a sender SSRC that has timed out and became a receiver
536    */
537   gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
538       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
539       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
540           on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
541       G_TYPE_NONE, 1, G_TYPE_UINT);
542
543   g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
544       g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
545           "The NTP base time corresponding to running_time 0 (deprecated)", 0,
546           G_MAXUINT64, DEFAULT_NTP_NS_BASE,
547           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548
549   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
550       g_param_spec_double ("bandwidth", "Bandwidth",
551           "The bandwidth of the session in bytes per second (0 for auto-discover)",
552           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
553           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
556       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
557           "The RTCP bandwidth of the session in bytes per second "
558           "(or as a real fraction of the RTP bandwidth if < 1.0)",
559           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
560           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
561
562   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
563       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
564           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
565           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
566           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
567
568   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
569       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
570           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
571           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
572           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
573
574   g_object_class_install_property (gobject_class, PROP_SDES,
575       g_param_spec_boxed ("sdes", "SDES",
576           "The SDES items of this session",
577           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
578
579   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
580       g_param_spec_uint ("num-sources", "Num Sources",
581           "The number of sources in the session", 0, G_MAXUINT,
582           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
583
584   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
585       g_param_spec_uint ("num-active-sources", "Num Active Sources",
586           "The number of active sources in the session", 0, G_MAXUINT,
587           DEFAULT_NUM_ACTIVE_SOURCES,
588           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
589
590   g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
591       g_param_spec_object ("internal-session", "Internal Session",
592           "The internal RTPSession object", RTP_TYPE_SESSION,
593           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
594
595   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
596       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
597           "Use the pipeline clock to set the NTP time in the RTCP SR messages",
598           DEFAULT_USE_PIPELINE_CLOCK,
599           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
600
601   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
602       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
603           "Minimum interval between Regular RTCP packet (in ns)",
604           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
605           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
606
607   gstelement_class->change_state =
608       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
609   gstelement_class->request_new_pad =
610       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
611   gstelement_class->release_pad =
612       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
613
614   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
615
616   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
617       "rtpsession", 0, "RTP Session");
618 }
619
620 static void
621 gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
622 {
623   rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
624   rtpsession->priv->lock = g_mutex_new ();
625   rtpsession->priv->sysclock = gst_system_clock_obtain ();
626   rtpsession->priv->session = rtp_session_new ();
627   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
628
629   /* configure callbacks */
630   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
631   /* configure signals */
632   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
633       (GCallback) on_new_ssrc, rtpsession);
634   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
635       (GCallback) on_ssrc_collision, rtpsession);
636   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
637       (GCallback) on_ssrc_validated, rtpsession);
638   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
639       (GCallback) on_ssrc_active, rtpsession);
640   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
641       (GCallback) on_ssrc_sdes, rtpsession);
642   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
643       (GCallback) on_bye_ssrc, rtpsession);
644   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
645       (GCallback) on_bye_timeout, rtpsession);
646   g_signal_connect (rtpsession->priv->session, "on-timeout",
647       (GCallback) on_timeout, rtpsession);
648   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
649       (GCallback) on_sender_timeout, rtpsession);
650   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
651       (GDestroyNotify) gst_caps_unref);
652
653   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
654   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
655
656   rtpsession->priv->thread_stopped = TRUE;
657 }
658
659 static void
660 gst_rtp_session_finalize (GObject * object)
661 {
662   GstRtpSession *rtpsession;
663
664   rtpsession = GST_RTP_SESSION (object);
665
666   g_hash_table_destroy (rtpsession->priv->ptmap);
667   g_mutex_free (rtpsession->priv->lock);
668   g_object_unref (rtpsession->priv->sysclock);
669   g_object_unref (rtpsession->priv->session);
670
671   G_OBJECT_CLASS (parent_class)->finalize (object);
672 }
673
674 static void
675 gst_rtp_session_set_property (GObject * object, guint prop_id,
676     const GValue * value, GParamSpec * pspec)
677 {
678   GstRtpSession *rtpsession;
679   GstRtpSessionPrivate *priv;
680
681   rtpsession = GST_RTP_SESSION (object);
682   priv = rtpsession->priv;
683
684   switch (prop_id) {
685     case PROP_NTP_NS_BASE:
686       GST_OBJECT_LOCK (rtpsession);
687       priv->ntpnsbase = g_value_get_uint64 (value);
688       GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
689           GST_TIME_ARGS (priv->ntpnsbase));
690       GST_OBJECT_UNLOCK (rtpsession);
691       break;
692     case PROP_BANDWIDTH:
693       g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
694       break;
695     case PROP_RTCP_FRACTION:
696       g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
697       break;
698     case PROP_RTCP_RR_BANDWIDTH:
699       g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
700           value);
701       break;
702     case PROP_RTCP_RS_BANDWIDTH:
703       g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
704           value);
705       break;
706     case PROP_SDES:
707       rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
708       break;
709     case PROP_USE_PIPELINE_CLOCK:
710       priv->use_pipeline_clock = g_value_get_boolean (value);
711       break;
712     case PROP_RTCP_MIN_INTERVAL:
713       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
714           value);
715       break;
716     default:
717       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
718       break;
719   }
720 }
721
722 static void
723 gst_rtp_session_get_property (GObject * object, guint prop_id,
724     GValue * value, GParamSpec * pspec)
725 {
726   GstRtpSession *rtpsession;
727   GstRtpSessionPrivate *priv;
728
729   rtpsession = GST_RTP_SESSION (object);
730   priv = rtpsession->priv;
731
732   switch (prop_id) {
733     case PROP_NTP_NS_BASE:
734       GST_OBJECT_LOCK (rtpsession);
735       g_value_set_uint64 (value, priv->ntpnsbase);
736       GST_OBJECT_UNLOCK (rtpsession);
737       break;
738     case PROP_BANDWIDTH:
739       g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
740       break;
741     case PROP_RTCP_FRACTION:
742       g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
743       break;
744     case PROP_RTCP_RR_BANDWIDTH:
745       g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
746           value);
747       break;
748     case PROP_RTCP_RS_BANDWIDTH:
749       g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
750           value);
751       break;
752     case PROP_SDES:
753       g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
754       break;
755     case PROP_NUM_SOURCES:
756       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
757       break;
758     case PROP_NUM_ACTIVE_SOURCES:
759       g_value_set_uint (value,
760           rtp_session_get_num_active_sources (priv->session));
761       break;
762     case PROP_INTERNAL_SESSION:
763       g_value_set_object (value, priv->session);
764       break;
765     case PROP_USE_PIPELINE_CLOCK:
766       g_value_set_boolean (value, priv->use_pipeline_clock);
767       break;
768     case PROP_RTCP_MIN_INTERVAL:
769       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
770           value);
771       break;
772     default:
773       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
774       break;
775   }
776 }
777
778 static void
779 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
780     guint64 * ntpnstime)
781 {
782   guint64 ntpns;
783   GstClock *clock;
784   GstClockTime base_time, rt, clock_time;
785
786   GST_OBJECT_LOCK (rtpsession);
787   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
788     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
789     gst_object_ref (clock);
790     GST_OBJECT_UNLOCK (rtpsession);
791
792     clock_time = gst_clock_get_time (clock);
793
794     if (rtpsession->priv->use_pipeline_clock) {
795       ntpns = clock_time;
796     } else {
797       GTimeVal current;
798
799       /* get current NTP time */
800       g_get_current_time (&current);
801       ntpns = GST_TIMEVAL_TO_TIME (current);
802     }
803
804     /* add constant to convert from 1970 based time to 1900 based time */
805     ntpns += (2208988800LL * GST_SECOND);
806
807     /* get current clock time and convert to running time */
808     rt = clock_time - base_time;
809
810     gst_object_unref (clock);
811   } else {
812     GST_OBJECT_UNLOCK (rtpsession);
813     rt = -1;
814     ntpns = -1;
815   }
816   if (running_time)
817     *running_time = rt;
818   if (ntpnstime)
819     *ntpnstime = ntpns;
820 }
821
822 static void
823 rtcp_thread (GstRtpSession * rtpsession)
824 {
825   GstClockID id;
826   GstClockTime current_time;
827   GstClockTime next_timeout;
828   guint64 ntpnstime;
829   GstClockTime running_time;
830   RTPSession *session;
831   GstClock *sysclock;
832
833   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
834
835   GST_RTP_SESSION_LOCK (rtpsession);
836
837   sysclock = rtpsession->priv->sysclock;
838   current_time = gst_clock_get_time (sysclock);
839
840   session = rtpsession->priv->session;
841
842   GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
843       GST_TIME_ARGS (current_time));
844   session->start_time = current_time;
845
846   while (!rtpsession->priv->stop_thread) {
847     GstClockReturn res;
848
849     /* get initial estimate */
850     next_timeout = rtp_session_next_timeout (session, current_time);
851
852     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
853         GST_TIME_ARGS (next_timeout));
854
855     /* leave if no more timeouts, the session ended */
856     if (next_timeout == GST_CLOCK_TIME_NONE)
857       break;
858
859     id = rtpsession->priv->id =
860         gst_clock_new_single_shot_id (sysclock, next_timeout);
861     GST_RTP_SESSION_UNLOCK (rtpsession);
862
863     res = gst_clock_id_wait (id, NULL);
864
865     GST_RTP_SESSION_LOCK (rtpsession);
866     gst_clock_id_unref (id);
867     rtpsession->priv->id = NULL;
868
869     if (rtpsession->priv->stop_thread)
870       break;
871
872     /* update current time */
873     current_time = gst_clock_get_time (sysclock);
874
875     /* get current NTP time */
876     get_current_times (rtpsession, &running_time, &ntpnstime);
877
878     /* we get unlocked because we need to perform reconsideration, don't perform
879      * the timeout but get a new reporting estimate. */
880     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
881         res, GST_TIME_ARGS (current_time));
882
883     /* perform actions, we ignore result. Release lock because it might push. */
884     GST_RTP_SESSION_UNLOCK (rtpsession);
885     rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
886     GST_RTP_SESSION_LOCK (rtpsession);
887   }
888   /* mark the thread as stopped now */
889   rtpsession->priv->thread_stopped = TRUE;
890   GST_RTP_SESSION_UNLOCK (rtpsession);
891
892   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
893 }
894
895 static gboolean
896 start_rtcp_thread (GstRtpSession * rtpsession)
897 {
898   GError *error = NULL;
899   gboolean res;
900
901   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
902
903   GST_RTP_SESSION_LOCK (rtpsession);
904   rtpsession->priv->stop_thread = FALSE;
905   if (rtpsession->priv->thread_stopped) {
906     /* if the thread stopped, and we still have a handle to the thread, join it
907      * now. We can safely join with the lock held, the thread will not take it
908      * anymore. */
909     if (rtpsession->priv->thread)
910       g_thread_join (rtpsession->priv->thread);
911     /* only create a new thread if the old one was stopped. Otherwise we can
912      * just reuse the currently running one. */
913 #if !GLIB_CHECK_VERSION (2, 31, 0)
914     rtpsession->priv->thread =
915         g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
916 #else
917     rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
918         (GThreadFunc) rtcp_thread, rtpsession, &error);
919 #endif
920     rtpsession->priv->thread_stopped = FALSE;
921   }
922   GST_RTP_SESSION_UNLOCK (rtpsession);
923
924   if (error != NULL) {
925     res = FALSE;
926     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
927     g_error_free (error);
928   } else {
929     res = TRUE;
930   }
931   return res;
932 }
933
934 static void
935 stop_rtcp_thread (GstRtpSession * rtpsession)
936 {
937   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
938
939   GST_RTP_SESSION_LOCK (rtpsession);
940   rtpsession->priv->stop_thread = TRUE;
941   if (rtpsession->priv->id)
942     gst_clock_id_unschedule (rtpsession->priv->id);
943   GST_RTP_SESSION_UNLOCK (rtpsession);
944 }
945
946 static void
947 join_rtcp_thread (GstRtpSession * rtpsession)
948 {
949   GST_RTP_SESSION_LOCK (rtpsession);
950   /* don't try to join when we have no thread */
951   if (rtpsession->priv->thread != NULL) {
952     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
953     GST_RTP_SESSION_UNLOCK (rtpsession);
954
955     g_thread_join (rtpsession->priv->thread);
956
957     GST_RTP_SESSION_LOCK (rtpsession);
958     /* after the join, take the lock and clear the thread structure. The caller
959      * is supposed to not concurrently call start and join. */
960     rtpsession->priv->thread = NULL;
961   }
962   GST_RTP_SESSION_UNLOCK (rtpsession);
963 }
964
965 static GstStateChangeReturn
966 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
967 {
968   GstStateChangeReturn res;
969   GstRtpSession *rtpsession;
970
971   rtpsession = GST_RTP_SESSION (element);
972
973   switch (transition) {
974     case GST_STATE_CHANGE_NULL_TO_READY:
975       break;
976     case GST_STATE_CHANGE_READY_TO_PAUSED:
977       break;
978     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
979       break;
980     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
981     case GST_STATE_CHANGE_PAUSED_TO_READY:
982       /* no need to join yet, we might want to continue later. Also, the
983        * dataflow could block downstream so that a join could just block
984        * forever. */
985       stop_rtcp_thread (rtpsession);
986       break;
987     default:
988       break;
989   }
990
991   res = parent_class->change_state (element, transition);
992
993   switch (transition) {
994     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
995       if (!start_rtcp_thread (rtpsession))
996         goto failed_thread;
997       break;
998     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
999       break;
1000     case GST_STATE_CHANGE_PAUSED_TO_READY:
1001       /* downstream is now releasing the dataflow and we can join. */
1002       join_rtcp_thread (rtpsession);
1003       break;
1004     case GST_STATE_CHANGE_READY_TO_NULL:
1005       break;
1006     default:
1007       break;
1008   }
1009   return res;
1010
1011   /* ERRORS */
1012 failed_thread:
1013   {
1014     return GST_STATE_CHANGE_FAILURE;
1015   }
1016 }
1017
1018 static gboolean
1019 return_true (gpointer key, gpointer value, gpointer user_data)
1020 {
1021   return TRUE;
1022 }
1023
1024 static void
1025 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1026 {
1027   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1028 }
1029
1030 /* called when the session manager has an RTP packet or a list of packets
1031  * ready for further processing */
1032 static GstFlowReturn
1033 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1034     GstBuffer * buffer, gpointer user_data)
1035 {
1036   GstFlowReturn result;
1037   GstRtpSession *rtpsession;
1038   GstPad *rtp_src;
1039
1040   rtpsession = GST_RTP_SESSION (user_data);
1041
1042   GST_RTP_SESSION_LOCK (rtpsession);
1043   if ((rtp_src = rtpsession->recv_rtp_src))
1044     gst_object_ref (rtp_src);
1045   GST_RTP_SESSION_UNLOCK (rtpsession);
1046
1047   if (rtp_src) {
1048     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1049     result = gst_pad_push (rtp_src, buffer);
1050     gst_object_unref (rtp_src);
1051   } else {
1052     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1053     gst_buffer_unref (buffer);
1054     result = GST_FLOW_OK;
1055   }
1056   return result;
1057 }
1058
1059 /* called when the session manager has an RTP packet ready for further
1060  * sending */
1061 static GstFlowReturn
1062 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1063     gpointer data, gpointer user_data)
1064 {
1065   GstFlowReturn result;
1066   GstRtpSession *rtpsession;
1067   GstPad *rtp_src;
1068
1069   rtpsession = GST_RTP_SESSION (user_data);
1070
1071   GST_RTP_SESSION_LOCK (rtpsession);
1072   if ((rtp_src = rtpsession->send_rtp_src))
1073     gst_object_ref (rtp_src);
1074   GST_RTP_SESSION_UNLOCK (rtpsession);
1075
1076   if (rtp_src) {
1077     if (GST_IS_BUFFER (data)) {
1078       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1079       result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1080     } else {
1081       GST_LOG_OBJECT (rtpsession, "sending RTP list");
1082       result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1083     }
1084     gst_object_unref (rtp_src);
1085   } else {
1086     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1087     result = GST_FLOW_OK;
1088   }
1089   return result;
1090 }
1091
1092 /* called when the session manager has an RTCP packet ready for further
1093  * sending. The eos flag is set when an EOS event should be sent downstream as
1094  * well. */
1095 static GstFlowReturn
1096 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1097     GstBuffer * buffer, gboolean eos, gpointer user_data)
1098 {
1099   GstFlowReturn result;
1100   GstRtpSession *rtpsession;
1101   GstPad *rtcp_src;
1102
1103   rtpsession = GST_RTP_SESSION (user_data);
1104
1105   GST_RTP_SESSION_LOCK (rtpsession);
1106   if (rtpsession->priv->stop_thread)
1107     goto stopping;
1108
1109   if ((rtcp_src = rtpsession->send_rtcp_src)) {
1110     GstCaps *caps;
1111
1112     /* set rtcp caps on output pad */
1113     if (!(caps = GST_PAD_CAPS (rtcp_src))) {
1114       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
1115       gst_pad_set_caps (rtcp_src, caps);
1116     } else
1117       gst_caps_ref (caps);
1118     gst_buffer_set_caps (buffer, caps);
1119     gst_caps_unref (caps);
1120
1121     gst_object_ref (rtcp_src);
1122     GST_RTP_SESSION_UNLOCK (rtpsession);
1123
1124     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1125     result = gst_pad_push (rtcp_src, buffer);
1126
1127     /* we have to send EOS after this packet */
1128     if (eos) {
1129       GST_LOG_OBJECT (rtpsession, "sending EOS");
1130       gst_pad_push_event (rtcp_src, gst_event_new_eos ());
1131     }
1132     gst_object_unref (rtcp_src);
1133   } else {
1134     GST_RTP_SESSION_UNLOCK (rtpsession);
1135
1136     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1137     gst_buffer_unref (buffer);
1138     result = GST_FLOW_OK;
1139   }
1140   return result;
1141
1142   /* ERRORS */
1143 stopping:
1144   {
1145     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1146     gst_buffer_unref (buffer);
1147     GST_RTP_SESSION_UNLOCK (rtpsession);
1148     return GST_FLOW_OK;
1149   }
1150 }
1151
1152 /* called when the session manager has an SR RTCP packet ready for handling
1153  * inter stream synchronisation */
1154 static GstFlowReturn
1155 gst_rtp_session_sync_rtcp (RTPSession * sess, RTPSource * src,
1156     GstBuffer * buffer, gpointer user_data)
1157 {
1158   GstFlowReturn result;
1159   GstRtpSession *rtpsession;
1160   GstPad *sync_src;
1161
1162   rtpsession = GST_RTP_SESSION (user_data);
1163
1164   GST_RTP_SESSION_LOCK (rtpsession);
1165   if (rtpsession->priv->stop_thread)
1166     goto stopping;
1167
1168   if ((sync_src = rtpsession->sync_src)) {
1169     GstCaps *caps;
1170
1171     /* set rtcp caps on output pad */
1172     if (!(caps = GST_PAD_CAPS (sync_src))) {
1173       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
1174       gst_pad_set_caps (sync_src, caps);
1175     } else
1176       gst_caps_ref (caps);
1177     gst_buffer_set_caps (buffer, caps);
1178     gst_caps_unref (caps);
1179
1180     gst_object_ref (sync_src);
1181     GST_RTP_SESSION_UNLOCK (rtpsession);
1182
1183     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1184     result = gst_pad_push (sync_src, buffer);
1185     gst_object_unref (sync_src);
1186   } else {
1187     GST_RTP_SESSION_UNLOCK (rtpsession);
1188
1189     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1190     gst_buffer_unref (buffer);
1191     result = GST_FLOW_OK;
1192   }
1193   return result;
1194
1195   /* ERRORS */
1196 stopping:
1197   {
1198     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1199     gst_buffer_unref (buffer);
1200     GST_RTP_SESSION_UNLOCK (rtpsession);
1201     return GST_FLOW_OK;
1202   }
1203 }
1204
1205 static void
1206 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1207 {
1208   GstRtpSessionPrivate *priv;
1209   const GstStructure *s;
1210   gint payload;
1211
1212   priv = rtpsession->priv;
1213
1214   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1215
1216   s = gst_caps_get_structure (caps, 0);
1217   if (!gst_structure_get_int (s, "payload", &payload))
1218     return;
1219
1220   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1221     return;
1222
1223   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1224       gst_caps_ref (caps));
1225 }
1226
1227 static GstCaps *
1228 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1229 {
1230   GstCaps *caps = NULL;
1231   GValue args[2] = { {0}, {0} };
1232   GValue ret = { 0 };
1233
1234   GST_RTP_SESSION_LOCK (rtpsession);
1235   caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1236       GINT_TO_POINTER (payload));
1237   if (caps) {
1238     gst_caps_ref (caps);
1239     goto done;
1240   }
1241
1242   /* not found in the cache, try to get it with a signal */
1243   g_value_init (&args[0], GST_TYPE_ELEMENT);
1244   g_value_set_object (&args[0], rtpsession);
1245   g_value_init (&args[1], G_TYPE_UINT);
1246   g_value_set_uint (&args[1], payload);
1247
1248   g_value_init (&ret, GST_TYPE_CAPS);
1249   g_value_set_boxed (&ret, NULL);
1250
1251   GST_RTP_SESSION_UNLOCK (rtpsession);
1252
1253   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1254       &ret);
1255
1256   GST_RTP_SESSION_LOCK (rtpsession);
1257
1258   g_value_unset (&args[0]);
1259   g_value_unset (&args[1]);
1260   caps = (GstCaps *) g_value_dup_boxed (&ret);
1261   g_value_unset (&ret);
1262   if (!caps)
1263     goto no_caps;
1264
1265   gst_rtp_session_cache_caps (rtpsession, caps);
1266
1267 done:
1268   GST_RTP_SESSION_UNLOCK (rtpsession);
1269
1270   return caps;
1271
1272 no_caps:
1273   {
1274     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1275     goto done;
1276   }
1277 }
1278
1279 /* called when the session manager needs the clock rate */
1280 static gint
1281 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1282     gpointer user_data)
1283 {
1284   gint result = -1;
1285   GstRtpSession *rtpsession;
1286   GstCaps *caps;
1287   const GstStructure *s;
1288
1289   rtpsession = GST_RTP_SESSION_CAST (user_data);
1290
1291   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1292
1293   if (!caps)
1294     goto done;
1295
1296   s = gst_caps_get_structure (caps, 0);
1297   if (!gst_structure_get_int (s, "clock-rate", &result))
1298     goto no_clock_rate;
1299
1300   gst_caps_unref (caps);
1301
1302   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1303
1304 done:
1305
1306   return result;
1307
1308   /* ERRORS */
1309 no_clock_rate:
1310   {
1311     gst_caps_unref (caps);
1312     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1313     goto done;
1314   }
1315 }
1316
1317 /* called when the session manager asks us to reconsider the timeout */
1318 static void
1319 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1320 {
1321   GstRtpSession *rtpsession;
1322
1323   rtpsession = GST_RTP_SESSION_CAST (user_data);
1324
1325   GST_RTP_SESSION_LOCK (rtpsession);
1326   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1327   if (rtpsession->priv->id)
1328     gst_clock_id_unschedule (rtpsession->priv->id);
1329   GST_RTP_SESSION_UNLOCK (rtpsession);
1330 }
1331
1332 static gboolean
1333 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
1334 {
1335   GstRtpSession *rtpsession;
1336   gboolean ret = FALSE;
1337
1338   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1339   if (G_UNLIKELY (rtpsession == NULL)) {
1340     gst_event_unref (event);
1341     return FALSE;
1342   }
1343
1344   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1345       GST_EVENT_TYPE_NAME (event));
1346
1347   switch (GST_EVENT_TYPE (event)) {
1348     case GST_EVENT_FLUSH_STOP:
1349       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1350       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1351       break;
1352     case GST_EVENT_NEWSEGMENT:
1353     {
1354       gboolean update;
1355       gdouble rate, arate;
1356       GstFormat format;
1357       gint64 start, stop, time;
1358       GstSegment *segment;
1359
1360       segment = &rtpsession->recv_rtp_seg;
1361
1362       /* the newsegment event is needed to convert the RTP timestamp to
1363        * running_time, which is needed to generate a mapping from RTP to NTP
1364        * timestamps in SR reports */
1365       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1366           &start, &stop, &time);
1367
1368       GST_DEBUG_OBJECT (rtpsession,
1369           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1370           "format GST_FORMAT_TIME, "
1371           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1372           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1373           update, rate, arate, GST_TIME_ARGS (segment->start),
1374           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1375           GST_TIME_ARGS (segment->accum));
1376
1377       gst_segment_set_newsegment_full (segment, update, rate,
1378           arate, format, start, stop, time);
1379
1380       /* push event forward */
1381       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1382       break;
1383     }
1384     default:
1385       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1386       break;
1387   }
1388   gst_object_unref (rtpsession);
1389
1390   return ret;
1391
1392 }
1393
1394 static gboolean
1395 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1396     guint32 ssrc, guint payload, gboolean all_headers, gint count)
1397 {
1398   GstCaps *caps;
1399
1400   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1401
1402   if (caps) {
1403     const GstStructure *s = gst_caps_get_structure (caps, 0);
1404     gboolean pli;
1405     gboolean fir;
1406
1407     pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1408     fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1409
1410     /* Google Talk uses FIR for repair, so send it even if we just want a
1411      * regular PLI */
1412     if (!pli &&
1413         gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1414       fir = TRUE;
1415
1416     gst_caps_unref (caps);
1417
1418     if (pli || fir)
1419       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1420           gst_clock_get_time (rtpsession->priv->sysclock), fir, count);
1421   }
1422
1423   return FALSE;
1424 }
1425
1426 static gboolean
1427 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstEvent * event)
1428 {
1429   GstRtpSession *rtpsession;
1430   gboolean forward = TRUE;
1431   gboolean ret = TRUE;
1432   const GstStructure *s;
1433   guint32 ssrc;
1434   guint pt;
1435
1436   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1437   if (G_UNLIKELY (rtpsession == NULL)) {
1438     gst_event_unref (event);
1439     return FALSE;
1440   }
1441
1442   switch (GST_EVENT_TYPE (event)) {
1443     case GST_EVENT_CUSTOM_UPSTREAM:
1444       s = gst_event_get_structure (event);
1445       if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1446           gst_structure_get_uint (s, "ssrc", &ssrc) &&
1447           gst_structure_get_uint (s, "payload", &pt)) {
1448         gboolean all_headers = FALSE;
1449         gint count = -1;
1450
1451         gst_structure_get_boolean (s, "all-headers", &all_headers);
1452         if (gst_structure_get_int (s, "count", &count) && count < 0)
1453           count += G_MAXINT;    /* Make sure count is positive if present */
1454         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1455                 all_headers, count))
1456           forward = FALSE;
1457       }
1458       break;
1459     default:
1460       break;
1461   }
1462
1463   if (forward)
1464     ret = gst_pad_push_event (rtpsession->recv_rtp_sink, event);
1465
1466   gst_object_unref (rtpsession);
1467
1468   return ret;
1469 }
1470
1471
1472 static GstIterator *
1473 gst_rtp_session_iterate_internal_links (GstPad * pad)
1474 {
1475   GstRtpSession *rtpsession;
1476   GstPad *otherpad = NULL;
1477   GstIterator *it = NULL;
1478
1479   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1480   if (G_UNLIKELY (rtpsession == NULL))
1481     return NULL;
1482
1483   GST_RTP_SESSION_LOCK (rtpsession);
1484   if (pad == rtpsession->recv_rtp_src) {
1485     otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1486   } else if (pad == rtpsession->recv_rtp_sink) {
1487     otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1488   } else if (pad == rtpsession->send_rtp_src) {
1489     otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1490   } else if (pad == rtpsession->send_rtp_sink) {
1491     otherpad = gst_object_ref (rtpsession->send_rtp_src);
1492   }
1493   GST_RTP_SESSION_UNLOCK (rtpsession);
1494
1495   if (otherpad) {
1496     it = gst_iterator_new_single (GST_TYPE_PAD, otherpad,
1497         (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
1498     gst_object_unref (otherpad);
1499   }
1500
1501   gst_object_unref (rtpsession);
1502
1503   return it;
1504 }
1505
1506 static gboolean
1507 gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps)
1508 {
1509   GstRtpSession *rtpsession;
1510
1511   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1512
1513   GST_RTP_SESSION_LOCK (rtpsession);
1514   gst_rtp_session_cache_caps (rtpsession, caps);
1515   GST_RTP_SESSION_UNLOCK (rtpsession);
1516
1517   gst_object_unref (rtpsession);
1518
1519   return TRUE;
1520 }
1521
1522 /* receive a packet from a sender, send it to the RTP session manager and
1523  * forward the packet on the rtp_src pad
1524  */
1525 static GstFlowReturn
1526 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
1527 {
1528   GstRtpSession *rtpsession;
1529   GstRtpSessionPrivate *priv;
1530   GstFlowReturn ret;
1531   GstClockTime current_time, running_time;
1532   GstClockTime timestamp;
1533
1534   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1535   priv = rtpsession->priv;
1536
1537   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1538
1539   /* get NTP time when this packet was captured, this depends on the timestamp. */
1540   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1541   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1542     /* convert to running time using the segment values */
1543     running_time =
1544         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1545         timestamp);
1546   } else {
1547     get_current_times (rtpsession, &running_time, NULL);
1548   }
1549   current_time = gst_clock_get_time (priv->sysclock);
1550
1551   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1552       running_time);
1553   if (ret != GST_FLOW_OK)
1554     goto push_error;
1555
1556 done:
1557   gst_object_unref (rtpsession);
1558
1559   return ret;
1560
1561   /* ERRORS */
1562 push_error:
1563   {
1564     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1565         gst_flow_get_name (ret));
1566     goto done;
1567   }
1568 }
1569
1570 static gboolean
1571 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
1572 {
1573   GstRtpSession *rtpsession;
1574   gboolean ret = FALSE;
1575
1576   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1577
1578   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1579       GST_EVENT_TYPE_NAME (event));
1580
1581   switch (GST_EVENT_TYPE (event)) {
1582     default:
1583       ret = gst_pad_push_event (rtpsession->sync_src, event);
1584       break;
1585   }
1586   gst_object_unref (rtpsession);
1587
1588   return ret;
1589 }
1590
1591 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1592  * forward the SR packets to the sync_src pad.
1593  */
1594 static GstFlowReturn
1595 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
1596 {
1597   GstRtpSession *rtpsession;
1598   GstRtpSessionPrivate *priv;
1599   GstClockTime current_time;
1600   guint64 ntpnstime;
1601
1602   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1603   priv = rtpsession->priv;
1604
1605   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
1606
1607   current_time = gst_clock_get_time (priv->sysclock);
1608   get_current_times (rtpsession, NULL, &ntpnstime);
1609
1610   rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
1611
1612   gst_object_unref (rtpsession);
1613
1614   return GST_FLOW_OK;           /* always return OK */
1615 }
1616
1617 static gboolean
1618 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstQuery * query)
1619 {
1620   GstRtpSession *rtpsession;
1621   gboolean ret = FALSE;
1622
1623   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1624
1625   GST_DEBUG_OBJECT (rtpsession, "received QUERY");
1626
1627   switch (GST_QUERY_TYPE (query)) {
1628     case GST_QUERY_LATENCY:
1629       ret = TRUE;
1630       /* use the defaults for the latency query. */
1631       gst_query_set_latency (query, FALSE, 0, -1);
1632       break;
1633     default:
1634       /* other queries simply fail for now */
1635       break;
1636   }
1637
1638   gst_object_unref (rtpsession);
1639
1640   return ret;
1641 }
1642
1643 static gboolean
1644 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstEvent * event)
1645 {
1646   GstRtpSession *rtpsession;
1647   gboolean ret = TRUE;
1648
1649   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1650   if (G_UNLIKELY (rtpsession == NULL)) {
1651     gst_event_unref (event);
1652     return FALSE;
1653   }
1654   GST_DEBUG_OBJECT (rtpsession, "received EVENT");
1655
1656   switch (GST_EVENT_TYPE (event)) {
1657     case GST_EVENT_SEEK:
1658     case GST_EVENT_LATENCY:
1659       gst_event_unref (event);
1660       ret = TRUE;
1661       break;
1662     default:
1663       /* other events simply fail for now */
1664       gst_event_unref (event);
1665       ret = FALSE;
1666       break;
1667   }
1668
1669   gst_object_unref (rtpsession);
1670   return ret;
1671 }
1672
1673
1674 static gboolean
1675 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
1676 {
1677   GstRtpSession *rtpsession;
1678   gboolean ret = FALSE;
1679
1680   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1681
1682   GST_DEBUG_OBJECT (rtpsession, "received event");
1683
1684   switch (GST_EVENT_TYPE (event)) {
1685     case GST_EVENT_FLUSH_STOP:
1686       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
1687       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1688       break;
1689     case GST_EVENT_NEWSEGMENT:{
1690       gboolean update;
1691       gdouble rate, arate;
1692       GstFormat format;
1693       gint64 start, stop, time;
1694       GstSegment *segment;
1695
1696       segment = &rtpsession->send_rtp_seg;
1697
1698       /* the newsegment event is needed to convert the RTP timestamp to
1699        * running_time, which is needed to generate a mapping from RTP to NTP
1700        * timestamps in SR reports */
1701       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1702           &start, &stop, &time);
1703
1704       GST_DEBUG_OBJECT (rtpsession,
1705           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1706           "format GST_FORMAT_TIME, "
1707           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1708           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1709           update, rate, arate, GST_TIME_ARGS (segment->start),
1710           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1711           GST_TIME_ARGS (segment->accum));
1712
1713       gst_segment_set_newsegment_full (segment, update, rate,
1714           arate, format, start, stop, time);
1715
1716       /* push event forward */
1717       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1718       break;
1719     }
1720     case GST_EVENT_EOS:{
1721       GstClockTime current_time;
1722
1723       /* push downstream FIXME, we are not supposed to leave the session just
1724        * because we stop sending. */
1725       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1726       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
1727       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
1728       rtp_session_schedule_bye (rtpsession->priv->session, "End of stream",
1729           current_time);
1730       break;
1731     }
1732     default:{
1733       GstPad *send_rtp_src = NULL;
1734       GST_RTP_SESSION_LOCK (rtpsession);
1735       if (rtpsession->send_rtp_src)
1736         send_rtp_src = gst_object_ref (rtpsession->send_rtp_src);
1737       GST_RTP_SESSION_UNLOCK (rtpsession);
1738
1739       if (send_rtp_src) {
1740         ret = gst_pad_push_event (send_rtp_src, event);
1741         gst_object_unref (send_rtp_src);
1742       } else
1743         gst_event_unref (event);
1744
1745       break;
1746     }
1747   }
1748   gst_object_unref (rtpsession);
1749
1750   return ret;
1751 }
1752
1753 static GstCaps *
1754 gst_rtp_session_getcaps_send_rtp (GstPad * pad)
1755 {
1756   GstRtpSession *rtpsession;
1757   GstRtpSessionPrivate *priv;
1758   GstCaps *result;
1759   GstStructure *s1, *s2;
1760   guint ssrc;
1761
1762   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1763   priv = rtpsession->priv;
1764
1765   ssrc = rtp_session_get_internal_ssrc (priv->session);
1766
1767   /* we can basically accept anything but we prefer to receive packets with our
1768    * internal SSRC so that we don't have to patch it. Create a structure with
1769    * the SSRC and another one without. */
1770   s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc, NULL);
1771   s2 = gst_structure_new ("application/x-rtp", NULL);
1772
1773   result = gst_caps_new_full (s1, s2, NULL);
1774
1775   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
1776
1777   gst_object_unref (rtpsession);
1778
1779   return result;
1780 }
1781
1782 static gboolean
1783 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstCaps * caps)
1784 {
1785   GstRtpSession *rtpsession;
1786   GstRtpSessionPrivate *priv;
1787   GstStructure *s = gst_caps_get_structure (caps, 0);
1788   guint ssrc;
1789
1790   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1791   priv = rtpsession->priv;
1792
1793   if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
1794     GST_DEBUG_OBJECT (rtpsession, "setting internal SSRC to %08x", ssrc);
1795     rtp_session_set_internal_ssrc (priv->session, ssrc);
1796   }
1797
1798   gst_object_unref (rtpsession);
1799
1800   return TRUE;
1801 }
1802
1803 /* Recieve an RTP packet or a list of packets to be send to the receivers,
1804  * send to RTP session manager and forward to send_rtp_src.
1805  */
1806 static GstFlowReturn
1807 gst_rtp_session_chain_send_rtp_common (GstPad * pad, gpointer data,
1808     gboolean is_list)
1809 {
1810   GstRtpSession *rtpsession;
1811   GstRtpSessionPrivate *priv;
1812   GstFlowReturn ret;
1813   GstClockTime timestamp, running_time;
1814   GstClockTime current_time;
1815
1816   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1817   priv = rtpsession->priv;
1818
1819   GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
1820
1821   /* get NTP time when this packet was captured, this depends on the timestamp. */
1822   if (is_list) {
1823     GstBuffer *buffer = NULL;
1824
1825     /* All groups in an list have the same timestamp.
1826      * So, just take it from the first group. */
1827     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0, 0);
1828     if (buffer)
1829       timestamp = GST_BUFFER_TIMESTAMP (buffer);
1830     else
1831       timestamp = -1;
1832   } else {
1833     timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (data));
1834   }
1835
1836   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1837     /* convert to running time using the segment start value. */
1838     running_time =
1839         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
1840         timestamp);
1841   } else {
1842     /* no timestamp. */
1843     running_time = -1;
1844   }
1845
1846   current_time = gst_clock_get_time (priv->sysclock);
1847   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
1848       running_time);
1849   if (ret != GST_FLOW_OK)
1850     goto push_error;
1851
1852 done:
1853   gst_object_unref (rtpsession);
1854
1855   return ret;
1856
1857   /* ERRORS */
1858 push_error:
1859   {
1860     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1861         gst_flow_get_name (ret));
1862     goto done;
1863   }
1864 }
1865
1866 static GstFlowReturn
1867 gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
1868 {
1869   return gst_rtp_session_chain_send_rtp_common (pad, buffer, FALSE);
1870 }
1871
1872 static GstFlowReturn
1873 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstBufferList * list)
1874 {
1875   return gst_rtp_session_chain_send_rtp_common (pad, list, TRUE);
1876 }
1877
1878 /* Create sinkpad to receive RTP packets from senders. This will also create a
1879  * srcpad for the RTP packets.
1880  */
1881 static GstPad *
1882 create_recv_rtp_sink (GstRtpSession * rtpsession)
1883 {
1884   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
1885
1886   rtpsession->recv_rtp_sink =
1887       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
1888       "recv_rtp_sink");
1889   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
1890       gst_rtp_session_chain_recv_rtp);
1891   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
1892       (GstPadEventFunction) gst_rtp_session_event_recv_rtp_sink);
1893   gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
1894       gst_rtp_session_sink_setcaps);
1895   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
1896       gst_rtp_session_iterate_internal_links);
1897   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
1898   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1899       rtpsession->recv_rtp_sink);
1900
1901   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
1902   rtpsession->recv_rtp_src =
1903       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
1904       "recv_rtp_src");
1905   gst_pad_set_event_function (rtpsession->recv_rtp_src,
1906       (GstPadEventFunction) gst_rtp_session_event_recv_rtp_src);
1907   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
1908       gst_rtp_session_iterate_internal_links);
1909   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
1910   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
1911   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
1912
1913   return rtpsession->recv_rtp_sink;
1914 }
1915
1916 /* Remove sinkpad to receive RTP packets from senders. This will also remove
1917  * the srcpad for the RTP packets.
1918  */
1919 static void
1920 remove_recv_rtp_sink (GstRtpSession * rtpsession)
1921 {
1922   GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
1923
1924   /* deactivate from source to sink */
1925   gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
1926   gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
1927
1928   /* remove pads */
1929   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
1930       rtpsession->recv_rtp_sink);
1931   rtpsession->recv_rtp_sink = NULL;
1932
1933   GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
1934   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
1935       rtpsession->recv_rtp_src);
1936   rtpsession->recv_rtp_src = NULL;
1937 }
1938
1939 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
1940  * sync_src pad for the SR packets.
1941  */
1942 static GstPad *
1943 create_recv_rtcp_sink (GstRtpSession * rtpsession)
1944 {
1945   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
1946
1947   rtpsession->recv_rtcp_sink =
1948       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
1949       "recv_rtcp_sink");
1950   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
1951       gst_rtp_session_chain_recv_rtcp);
1952   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
1953       (GstPadEventFunction) gst_rtp_session_event_recv_rtcp_sink);
1954   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
1955       gst_rtp_session_iterate_internal_links);
1956   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
1957   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1958       rtpsession->recv_rtcp_sink);
1959
1960   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
1961   rtpsession->sync_src =
1962       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
1963       "sync_src");
1964   gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
1965       gst_rtp_session_iterate_internal_links);
1966   gst_pad_use_fixed_caps (rtpsession->sync_src);
1967   gst_pad_set_active (rtpsession->sync_src, TRUE);
1968   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
1969
1970   return rtpsession->recv_rtcp_sink;
1971 }
1972
1973 static void
1974 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
1975 {
1976   GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
1977
1978   gst_pad_set_active (rtpsession->sync_src, FALSE);
1979   gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
1980
1981   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
1982       rtpsession->recv_rtcp_sink);
1983   rtpsession->recv_rtcp_sink = NULL;
1984
1985   GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
1986   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
1987   rtpsession->sync_src = NULL;
1988 }
1989
1990 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
1991  * send_rtp_src pad.
1992  */
1993 static GstPad *
1994 create_send_rtp_sink (GstRtpSession * rtpsession)
1995 {
1996   GST_DEBUG_OBJECT (rtpsession, "creating pad");
1997
1998   rtpsession->send_rtp_sink =
1999       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2000       "send_rtp_sink");
2001   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2002       gst_rtp_session_chain_send_rtp);
2003   gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2004       gst_rtp_session_chain_send_rtp_list);
2005   gst_pad_set_getcaps_function (rtpsession->send_rtp_sink,
2006       gst_rtp_session_getcaps_send_rtp);
2007   gst_pad_set_setcaps_function (rtpsession->send_rtp_sink,
2008       gst_rtp_session_setcaps_send_rtp);
2009   gst_pad_set_event_function (rtpsession->send_rtp_sink,
2010       (GstPadEventFunction) gst_rtp_session_event_send_rtp_sink);
2011   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2012       gst_rtp_session_iterate_internal_links);
2013   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2014   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2015       rtpsession->send_rtp_sink);
2016
2017   rtpsession->send_rtp_src =
2018       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2019       "send_rtp_src");
2020   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2021       gst_rtp_session_iterate_internal_links);
2022   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2023   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2024
2025   return rtpsession->send_rtp_sink;
2026 }
2027
2028 static void
2029 remove_send_rtp_sink (GstRtpSession * rtpsession)
2030 {
2031   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2032
2033   gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2034   gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2035
2036   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2037       rtpsession->send_rtp_sink);
2038   rtpsession->send_rtp_sink = NULL;
2039
2040   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2041       rtpsession->send_rtp_src);
2042   rtpsession->send_rtp_src = NULL;
2043 }
2044
2045 /* Create a srcpad with the RTCP packets to send out.
2046  * This pad will be driven by the RTP session manager when it wants to send out
2047  * RTCP packets.
2048  */
2049 static GstPad *
2050 create_send_rtcp_src (GstRtpSession * rtpsession)
2051 {
2052   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2053
2054   rtpsession->send_rtcp_src =
2055       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2056       "send_rtcp_src");
2057   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2058   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2059   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2060       gst_rtp_session_iterate_internal_links);
2061   gst_pad_set_query_function (rtpsession->send_rtcp_src,
2062       gst_rtp_session_query_send_rtcp_src);
2063   gst_pad_set_event_function (rtpsession->send_rtcp_src,
2064       gst_rtp_session_event_send_rtcp_src);
2065   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2066       rtpsession->send_rtcp_src);
2067
2068   return rtpsession->send_rtcp_src;
2069 }
2070
2071 static void
2072 remove_send_rtcp_src (GstRtpSession * rtpsession)
2073 {
2074   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2075
2076   gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2077
2078   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2079       rtpsession->send_rtcp_src);
2080   rtpsession->send_rtcp_src = NULL;
2081 }
2082
2083 static GstPad *
2084 gst_rtp_session_request_new_pad (GstElement * element,
2085     GstPadTemplate * templ, const gchar * name)
2086 {
2087   GstRtpSession *rtpsession;
2088   GstElementClass *klass;
2089   GstPad *result;
2090
2091   g_return_val_if_fail (templ != NULL, NULL);
2092   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2093
2094   rtpsession = GST_RTP_SESSION (element);
2095   klass = GST_ELEMENT_GET_CLASS (element);
2096
2097   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2098
2099   GST_RTP_SESSION_LOCK (rtpsession);
2100
2101   /* figure out the template */
2102   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2103     if (rtpsession->recv_rtp_sink != NULL)
2104       goto exists;
2105
2106     result = create_recv_rtp_sink (rtpsession);
2107   } else if (templ == gst_element_class_get_pad_template (klass,
2108           "recv_rtcp_sink")) {
2109     if (rtpsession->recv_rtcp_sink != NULL)
2110       goto exists;
2111
2112     result = create_recv_rtcp_sink (rtpsession);
2113   } else if (templ == gst_element_class_get_pad_template (klass,
2114           "send_rtp_sink")) {
2115     if (rtpsession->send_rtp_sink != NULL)
2116       goto exists;
2117
2118     result = create_send_rtp_sink (rtpsession);
2119   } else if (templ == gst_element_class_get_pad_template (klass,
2120           "send_rtcp_src")) {
2121     if (rtpsession->send_rtcp_src != NULL)
2122       goto exists;
2123
2124     result = create_send_rtcp_src (rtpsession);
2125   } else
2126     goto wrong_template;
2127
2128   GST_RTP_SESSION_UNLOCK (rtpsession);
2129
2130   return result;
2131
2132   /* ERRORS */
2133 wrong_template:
2134   {
2135     GST_RTP_SESSION_UNLOCK (rtpsession);
2136     g_warning ("gstrtpsession: this is not our template");
2137     return NULL;
2138   }
2139 exists:
2140   {
2141     GST_RTP_SESSION_UNLOCK (rtpsession);
2142     g_warning ("gstrtpsession: pad already requested");
2143     return NULL;
2144   }
2145 }
2146
2147 static void
2148 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2149 {
2150   GstRtpSession *rtpsession;
2151
2152   g_return_if_fail (GST_IS_RTP_SESSION (element));
2153   g_return_if_fail (GST_IS_PAD (pad));
2154
2155   rtpsession = GST_RTP_SESSION (element);
2156
2157   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2158
2159   GST_RTP_SESSION_LOCK (rtpsession);
2160
2161   if (rtpsession->recv_rtp_sink == pad) {
2162     remove_recv_rtp_sink (rtpsession);
2163   } else if (rtpsession->recv_rtcp_sink == pad) {
2164     remove_recv_rtcp_sink (rtpsession);
2165   } else if (rtpsession->send_rtp_sink == pad) {
2166     remove_send_rtp_sink (rtpsession);
2167   } else if (rtpsession->send_rtcp_src == pad) {
2168     remove_send_rtcp_src (rtpsession);
2169   } else
2170     goto wrong_pad;
2171
2172   GST_RTP_SESSION_UNLOCK (rtpsession);
2173
2174   return;
2175
2176   /* ERRORS */
2177 wrong_pad:
2178   {
2179     GST_RTP_SESSION_UNLOCK (rtpsession);
2180     g_warning ("gstrtpsession: asked to release an unknown pad");
2181     return;
2182   }
2183 }
2184
2185 static void
2186 gst_rtp_session_request_key_unit (RTPSession * sess,
2187     gboolean all_headers, gpointer user_data)
2188 {
2189   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2190   GstEvent *event;
2191
2192   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2193       gst_structure_new ("GstForceKeyUnit",
2194           "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2195   gst_pad_push_event (rtpsession->send_rtp_sink, event);
2196 }
2197
2198 static GstClockTime
2199 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2200 {
2201   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2202
2203   return gst_clock_get_time (rtpsession->priv->sysclock);
2204 }