cc794b626aa30ae667c3c0aa24f0ff9efee41037
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpsession
22  * @see_also: gstrtpjitterbuffer, gstrtpbin, gstrtpptdemux, gstrtpssrcdemux
23  *
24  * The RTP session manager models one participant with a unique SSRC in an RTP
25  * session. This session can be used to send and receive RTP and RTCP packets.
26  * Based on what REQUEST pads are requested from the session manager, specific
27  * functionality can be activated.
28  * 
29  * The session manager currently implements RFC 3550 including:
30  * <itemizedlist>
31  *   <listitem>
32  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
33  *   </listitem>
34  *   <listitem>
35  *     <para>Maintainance of the SSRC participant database.</para>
36  *   </listitem>
37  *   <listitem>
38  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
39  *   </listitem>
40  *   <listitem>
41  *     <para>Scheduling of RR/SR RTCP packets.</para>
42  *   </listitem>
43  * </itemizedlist>
44  * 
45  * The gstrtpsession will not demux packets based on SSRC or payload type, nor will
46  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
47  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
48  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
49  * combines all these features in one element.
50  * 
51  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
52  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
53  * will be processed in the session and after being validated forwarded on the
54  * recv_rtp_src pad.
55  * 
56  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
57  * which will automatically create a sync_src pad. Packets received on the RTCP
58  * pad will be used by the session manager to update the stats and database of
59  * the other participants. SR packets will be forwarded on the sync_src pad
60  * so that they can be used to perform inter-stream synchronisation when needed.
61  * 
62  * If you want the session manager to generate and send RTCP packets, request
63  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
64  * that should be sent to all participants in the session.
65  * 
66  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
67  * automatically create a send_rtp_src pad. The session manager will modify the
68  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
69  * send_rtp_src pad after updating its internal state.
70  * 
71  * The session manager needs the clock-rate of the payload types it is handling
72  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
73  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
74  * signal.
75  * 
76  * <refsect2>
77  * <title>Example pipelines</title>
78  * |[
79  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
80  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
81  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
82  * configured based on some negotiation process such as RTSP for this pipeline
83  * to work correctly.
84  * |[
85  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession name=session \
86  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
87  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
88  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
89  * decoder and display. Receive RTCP packets from port 5001 and process them in
90  * the session manager.
91  * Note that the application/x-rtp caps on udpsrc should be
92  * configured based on some negotiation process such as RTSP for this pipeline
93  * to work correctly.
94  * |[
95  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession .send_rtp_src ! udpsink port=5000
96  * ]| Send theora RTP packets through the session manager and out on UDP port
97  * 5000.
98  * |[
99  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession name=session .send_rtp_src \
100  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
101  * ]| Send theora RTP packets through the session manager and out on UDP port
102  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
103  * correctly because the second udpsink will not preroll correctly (no RTCP
104  * packets are sent in the PAUSED state). Applications should manually set and
105  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
106  * </refsect2>
107  *
108  * Last reviewed on 2007-05-28 (0.10.5)
109  */
110
111 #ifdef HAVE_CONFIG_H
112 #include "config.h"
113 #endif
114
115 #include <gst/rtp/gstrtpbuffer.h>
116
117 #include "gstrtpbin-marshal.h"
118 #include "gstrtpsession.h"
119 #include "rtpsession.h"
120
121 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
122 #define GST_CAT_DEFAULT gst_rtp_session_debug
123
124 /* elementfactory information */
125 static const GstElementDetails rtpsession_details =
126 GST_ELEMENT_DETAILS ("RTP Session",
127     "Filter/Network/RTP",
128     "Implement an RTP session",
129     "Wim Taymans <wim.taymans@gmail.com>");
130
131 /* sink pads */
132 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
133 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
134     GST_PAD_SINK,
135     GST_PAD_REQUEST,
136     GST_STATIC_CAPS ("application/x-rtp")
137     );
138
139 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
140 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
141     GST_PAD_SINK,
142     GST_PAD_REQUEST,
143     GST_STATIC_CAPS ("application/x-rtcp")
144     );
145
146 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
147 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
148     GST_PAD_SINK,
149     GST_PAD_REQUEST,
150     GST_STATIC_CAPS ("application/x-rtp")
151     );
152
153 /* src pads */
154 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
155 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
156     GST_PAD_SRC,
157     GST_PAD_SOMETIMES,
158     GST_STATIC_CAPS ("application/x-rtp")
159     );
160
161 static GstStaticPadTemplate rtpsession_sync_src_template =
162 GST_STATIC_PAD_TEMPLATE ("sync_src",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS ("application/x-rtcp")
166     );
167
168 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
169 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
170     GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     GST_STATIC_CAPS ("application/x-rtp")
173     );
174
175 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
176 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
177     GST_PAD_SRC,
178     GST_PAD_REQUEST,
179     GST_STATIC_CAPS ("application/x-rtcp")
180     );
181
182 /* signals and args */
183 enum
184 {
185   SIGNAL_REQUEST_PT_MAP,
186   SIGNAL_CLEAR_PT_MAP,
187
188   SIGNAL_ON_NEW_SSRC,
189   SIGNAL_ON_SSRC_COLLISION,
190   SIGNAL_ON_SSRC_VALIDATED,
191   SIGNAL_ON_SSRC_ACTIVE,
192   SIGNAL_ON_SSRC_SDES,
193   SIGNAL_ON_BYE_SSRC,
194   SIGNAL_ON_BYE_TIMEOUT,
195   SIGNAL_ON_TIMEOUT,
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_NTP_NS_BASE          0
200 #define DEFAULT_BANDWIDTH            RTP_STATS_BANDWIDTH
201 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_BANDWIDTH
202 #define DEFAULT_SDES_CNAME           NULL
203 #define DEFAULT_SDES_NAME            NULL
204 #define DEFAULT_SDES_EMAIL           NULL
205 #define DEFAULT_SDES_PHONE           NULL
206 #define DEFAULT_SDES_LOCATION        NULL
207 #define DEFAULT_SDES_TOOL            NULL
208 #define DEFAULT_SDES_NOTE            NULL
209 #define DEFAULT_NUM_SOURCES          0
210 #define DEFAULT_NUM_ACTIVE_SOURCES   0
211
212 enum
213 {
214   PROP_0,
215   PROP_NTP_NS_BASE,
216   PROP_BANDWIDTH,
217   PROP_RTCP_FRACTION,
218   PROP_SDES_CNAME,
219   PROP_SDES_NAME,
220   PROP_SDES_EMAIL,
221   PROP_SDES_PHONE,
222   PROP_SDES_LOCATION,
223   PROP_SDES_TOOL,
224   PROP_SDES_NOTE,
225   PROP_NUM_SOURCES,
226   PROP_NUM_ACTIVE_SOURCES,
227   PROP_LAST
228 };
229
230 #define GST_RTP_SESSION_GET_PRIVATE(obj)  \
231            (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
232
233 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->priv->lock)
234 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->priv->lock)
235
236 struct _GstRtpSessionPrivate
237 {
238   GMutex *lock;
239   GstClock *sysclock;
240
241   RTPSession *session;
242
243   /* thread for sending out RTCP */
244   GstClockID id;
245   gboolean stop_thread;
246   GThread *thread;
247   gboolean thread_stopped;
248
249   /* caps mapping */
250   GHashTable *ptmap;
251
252   /* NTP base time */
253   guint64 ntpnsbase;
254 };
255
256 /* callbacks to handle actions from the session manager */
257 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
258     RTPSource * src, GstBuffer * buffer, gpointer user_data);
259 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
260     RTPSource * src, GstBuffer * buffer, gpointer user_data);
261 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
262     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
263 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
264     RTPSource * src, GstBuffer * buffer, gpointer user_data);
265 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
266     gpointer user_data);
267 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
268
269 static RTPSessionCallbacks callbacks = {
270   gst_rtp_session_process_rtp,
271   gst_rtp_session_send_rtp,
272   gst_rtp_session_sync_rtcp,
273   gst_rtp_session_send_rtcp,
274   gst_rtp_session_clock_rate,
275   gst_rtp_session_reconsider
276 };
277
278 /* GObject vmethods */
279 static void gst_rtp_session_finalize (GObject * object);
280 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
281     const GValue * value, GParamSpec * pspec);
282 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
283     GValue * value, GParamSpec * pspec);
284
285 /* GstElement vmethods */
286 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
287     GstStateChange transition);
288 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
289     GstPadTemplate * templ, const gchar * name);
290 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
291
292 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
293
294 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
295
296 static void
297 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
298 {
299   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
300       src->ssrc);
301 }
302
303 static void
304 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
305 {
306   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
307       src->ssrc);
308 }
309
310 static void
311 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
312 {
313   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
314       src->ssrc);
315 }
316
317 static void
318 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
319 {
320   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
321       src->ssrc);
322 }
323
324 static GstStructure *
325 source_get_sdes_structure (RTPSource * src)
326 {
327   GstStructure *result;
328   GValue val = { 0 };
329   gchar *str;
330
331   result = gst_structure_empty_new ("GstRTPSessionSDES");
332
333   gst_structure_set (result, "ssrc", G_TYPE_UINT, src->ssrc, NULL);
334
335   g_value_init (&val, G_TYPE_STRING);
336   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_CNAME);
337   if (str) {
338     g_value_take_string (&val, str);
339     gst_structure_set_value (result, "cname", &val);
340   }
341   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_NAME);
342   if (str) {
343     g_value_take_string (&val, str);
344     gst_structure_set_value (result, "name", &val);
345   }
346   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_EMAIL);
347   if (str) {
348     g_value_take_string (&val, str);
349     gst_structure_set_value (result, "email", &val);
350   }
351   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_PHONE);
352   if (str) {
353     g_value_take_string (&val, str);
354     gst_structure_set_value (result, "phone", &val);
355   }
356   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_LOC);
357   if (str) {
358     g_value_take_string (&val, str);
359     gst_structure_set_value (result, "location", &val);
360   }
361   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_TOOL);
362   if (str) {
363     g_value_take_string (&val, str);
364     gst_structure_set_value (result, "tool", &val);
365   }
366   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_NOTE);
367   if (str) {
368     g_value_take_string (&val, str);
369     gst_structure_set_value (result, "note", &val);
370   }
371   str = rtp_source_get_sdes_string (src, GST_RTCP_SDES_PRIV);
372   if (str) {
373     g_value_take_string (&val, str);
374     gst_structure_set_value (result, "priv", &val);
375   }
376   g_value_unset (&val);
377
378   return result;
379 }
380
381 static void
382 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
383 {
384   GstStructure *s;
385   GstMessage *m;
386
387   /* convert the new SDES info into a message */
388   RTP_SESSION_LOCK (session);
389   s = source_get_sdes_structure (src);
390   RTP_SESSION_UNLOCK (session);
391   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
392   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
393
394   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
395       src->ssrc);
396 }
397
398 static void
399 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
400 {
401   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
402       src->ssrc);
403 }
404
405 static void
406 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
407 {
408   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
409       src->ssrc);
410 }
411
412 static void
413 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
414 {
415   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
416       src->ssrc);
417 }
418
419 GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
420
421 static void
422 gst_rtp_session_base_init (gpointer klass)
423 {
424   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
425
426   /* sink pads */
427   gst_element_class_add_pad_template (element_class,
428       gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
429   gst_element_class_add_pad_template (element_class,
430       gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
431   gst_element_class_add_pad_template (element_class,
432       gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
433
434   /* src pads */
435   gst_element_class_add_pad_template (element_class,
436       gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
437   gst_element_class_add_pad_template (element_class,
438       gst_static_pad_template_get (&rtpsession_sync_src_template));
439   gst_element_class_add_pad_template (element_class,
440       gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
441   gst_element_class_add_pad_template (element_class,
442       gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
443
444   gst_element_class_set_details (element_class, &rtpsession_details);
445 }
446
447 static void
448 gst_rtp_session_class_init (GstRtpSessionClass * klass)
449 {
450   GObjectClass *gobject_class;
451   GstElementClass *gstelement_class;
452
453   gobject_class = (GObjectClass *) klass;
454   gstelement_class = (GstElementClass *) klass;
455
456   g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
457
458   gobject_class->finalize = gst_rtp_session_finalize;
459   gobject_class->set_property = gst_rtp_session_set_property;
460   gobject_class->get_property = gst_rtp_session_get_property;
461
462   /**
463    * GstRtpSession::request-pt-map:
464    * @sess: the object which received the signal
465    * @pt: the pt
466    *
467    * Request the payload type as #GstCaps for @pt.
468    */
469   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
470       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
471       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
472       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
473       G_TYPE_UINT);
474   /**
475    * GstRtpSession::clear-pt-map:
476    * @sess: the object which received the signal
477    *
478    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
479    */
480   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
481       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
482       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
483       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
484
485   /**
486    * GstRtpSession::on-new-ssrc:
487    * @sess: the object which received the signal
488    * @ssrc: the SSRC 
489    *
490    * Notify of a new SSRC that entered @session.
491    */
492   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
493       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
494       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
495       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
496   /**
497    * GstRtpSession::on-ssrc_collision:
498    * @sess: the object which received the signal
499    * @ssrc: the SSRC 
500    *
501    * Notify when we have an SSRC collision
502    */
503   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
504       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
505       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
506           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
507       G_TYPE_NONE, 1, G_TYPE_UINT);
508   /**
509    * GstRtpSession::on-ssrc_validated:
510    * @sess: the object which received the signal
511    * @ssrc: the SSRC 
512    *
513    * Notify of a new SSRC that became validated.
514    */
515   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
516       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
517       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
518           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
519       G_TYPE_NONE, 1, G_TYPE_UINT);
520   /**
521    * GstRtpSession::on-ssrc_active:
522    * @sess: the object which received the signal
523    * @ssrc: the SSRC
524    *
525    * Notify of a SSRC that is active, i.e., sending RTCP.
526    */
527   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
528       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
529       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
530           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
531       G_TYPE_NONE, 1, G_TYPE_UINT);
532   /**
533    * GstRtpSession::on-ssrc-sdes:
534    * @session: the object which received the signal
535    * @src: the SSRC
536    *
537    * Notify that a new SDES was received for SSRC.
538    */
539   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
540       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
541       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
542       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
543
544   /**
545    * GstRtpSession::on-bye-ssrc:
546    * @sess: the object which received the signal
547    * @ssrc: the SSRC 
548    *
549    * Notify of an SSRC that became inactive because of a BYE packet.
550    */
551   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
552       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
553       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
554       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
555   /**
556    * GstRtpSession::on-bye-timeout:
557    * @sess: the object which received the signal
558    * @ssrc: the SSRC 
559    *
560    * Notify of an SSRC that has timed out because of BYE
561    */
562   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
563       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
564       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
565       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
566   /**
567    * GstRtpSession::on-timeout:
568    * @sess: the object which received the signal
569    * @ssrc: the SSRC 
570    *
571    * Notify of an SSRC that has timed out
572    */
573   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
574       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
575       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
576       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
577
578   g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
579       g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
580           "The NTP base time corresponding to running_time 0", 0,
581           G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE));
582
583   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
584       g_param_spec_double ("bandwidth", "Bandwidth",
585           "The bandwidth of the session",
586           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE));
587
588   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
589       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
590           "The fraction of the bandwidth used for RTCP",
591           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE));
592
593   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
594       g_param_spec_string ("sdes-cname", "SDES CNAME",
595           "The CNAME to put in SDES messages of this session",
596           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
597
598   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
599       g_param_spec_string ("sdes-name", "SDES NAME",
600           "The NAME to put in SDES messages of this session",
601           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
602
603   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
604       g_param_spec_string ("sdes-email", "SDES EMAIL",
605           "The EMAIL to put in SDES messages of this session",
606           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
607
608   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
609       g_param_spec_string ("sdes-phone", "SDES PHONE",
610           "The PHONE to put in SDES messages of this session",
611           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
612
613   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
614       g_param_spec_string ("sdes-location", "SDES LOCATION",
615           "The LOCATION to put in SDES messages of this session",
616           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
617
618   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
619       g_param_spec_string ("sdes-tool", "SDES TOOL",
620           "The TOOL to put in SDES messages of this session",
621           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
622
623   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
624       g_param_spec_string ("sdes-note", "SDES NOTE",
625           "The NOTE to put in SDES messages of this session",
626           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
627
628   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
629       g_param_spec_uint ("num-sources", "Num Sources",
630           "The number of sources in the session", 0, G_MAXUINT,
631           DEFAULT_NUM_SOURCES, G_PARAM_READABLE));
632
633   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
634       g_param_spec_uint ("num-active-sources", "Num Active Sources",
635           "The number of active sources in the session", 0, G_MAXUINT,
636           DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE));
637
638   gstelement_class->change_state =
639       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
640   gstelement_class->request_new_pad =
641       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
642   gstelement_class->release_pad =
643       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
644
645   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
646
647   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
648       "rtpsession", 0, "RTP Session");
649 }
650
651 static void
652 gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
653 {
654   rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
655   rtpsession->priv->lock = g_mutex_new ();
656   rtpsession->priv->sysclock = gst_system_clock_obtain ();
657   rtpsession->priv->session = rtp_session_new ();
658   /* configure callbacks */
659   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
660   /* configure signals */
661   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
662       (GCallback) on_new_ssrc, rtpsession);
663   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
664       (GCallback) on_ssrc_collision, rtpsession);
665   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
666       (GCallback) on_ssrc_validated, rtpsession);
667   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
668       (GCallback) on_ssrc_active, rtpsession);
669   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
670       (GCallback) on_ssrc_sdes, rtpsession);
671   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
672       (GCallback) on_bye_ssrc, rtpsession);
673   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
674       (GCallback) on_bye_timeout, rtpsession);
675   g_signal_connect (rtpsession->priv->session, "on-timeout",
676       (GCallback) on_timeout, rtpsession);
677   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
678       (GDestroyNotify) gst_caps_unref);
679
680   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
681   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
682
683   rtpsession->priv->thread_stopped = TRUE;
684 }
685
686 static void
687 gst_rtp_session_finalize (GObject * object)
688 {
689   GstRtpSession *rtpsession;
690
691   rtpsession = GST_RTP_SESSION (object);
692
693   if (rtpsession->recv_rtp_sink != NULL)
694     gst_object_unref (rtpsession->recv_rtp_sink);
695   if (rtpsession->recv_rtcp_sink != NULL)
696     gst_object_unref (rtpsession->recv_rtcp_sink);
697   if (rtpsession->send_rtp_sink != NULL)
698     gst_object_unref (rtpsession->send_rtp_sink);
699   if (rtpsession->send_rtcp_src != NULL)
700     gst_object_unref (rtpsession->send_rtcp_src);
701
702   g_hash_table_destroy (rtpsession->priv->ptmap);
703   g_mutex_free (rtpsession->priv->lock);
704   g_object_unref (rtpsession->priv->sysclock);
705   g_object_unref (rtpsession->priv->session);
706
707   G_OBJECT_CLASS (parent_class)->finalize (object);
708 }
709
710 static void
711 gst_rtp_session_set_property (GObject * object, guint prop_id,
712     const GValue * value, GParamSpec * pspec)
713 {
714   GstRtpSession *rtpsession;
715   GstRtpSessionPrivate *priv;
716
717   rtpsession = GST_RTP_SESSION (object);
718   priv = rtpsession->priv;
719
720   switch (prop_id) {
721     case PROP_NTP_NS_BASE:
722       GST_OBJECT_LOCK (rtpsession);
723       priv->ntpnsbase = g_value_get_uint64 (value);
724       GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
725           GST_TIME_ARGS (priv->ntpnsbase));
726       GST_OBJECT_UNLOCK (rtpsession);
727       break;
728     case PROP_BANDWIDTH:
729       rtp_session_set_bandwidth (priv->session, g_value_get_double (value));
730       break;
731     case PROP_RTCP_FRACTION:
732       rtp_session_set_rtcp_fraction (priv->session, g_value_get_double (value));
733       break;
734     case PROP_SDES_CNAME:
735       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_CNAME,
736           g_value_get_string (value));
737       break;
738     case PROP_SDES_NAME:
739       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_NAME,
740           g_value_get_string (value));
741       break;
742     case PROP_SDES_EMAIL:
743       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_EMAIL,
744           g_value_get_string (value));
745       break;
746     case PROP_SDES_PHONE:
747       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_PHONE,
748           g_value_get_string (value));
749       break;
750     case PROP_SDES_LOCATION:
751       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_LOC,
752           g_value_get_string (value));
753       break;
754     case PROP_SDES_TOOL:
755       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_TOOL,
756           g_value_get_string (value));
757       break;
758     case PROP_SDES_NOTE:
759       rtp_session_set_sdes_string (priv->session, GST_RTCP_SDES_NOTE,
760           g_value_get_string (value));
761       break;
762     default:
763       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
764       break;
765   }
766 }
767
768 static void
769 gst_rtp_session_get_property (GObject * object, guint prop_id,
770     GValue * value, GParamSpec * pspec)
771 {
772   GstRtpSession *rtpsession;
773   GstRtpSessionPrivate *priv;
774
775   rtpsession = GST_RTP_SESSION (object);
776   priv = rtpsession->priv;
777
778   switch (prop_id) {
779     case PROP_NTP_NS_BASE:
780       GST_OBJECT_LOCK (rtpsession);
781       g_value_set_uint64 (value, priv->ntpnsbase);
782       GST_OBJECT_UNLOCK (rtpsession);
783       break;
784     case PROP_BANDWIDTH:
785       g_value_set_double (value, rtp_session_get_bandwidth (priv->session));
786       break;
787     case PROP_RTCP_FRACTION:
788       g_value_set_double (value, rtp_session_get_rtcp_fraction (priv->session));
789       break;
790     case PROP_SDES_CNAME:
791       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
792               GST_RTCP_SDES_CNAME));
793       break;
794     case PROP_SDES_NAME:
795       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
796               GST_RTCP_SDES_NAME));
797       break;
798     case PROP_SDES_EMAIL:
799       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
800               GST_RTCP_SDES_EMAIL));
801       break;
802     case PROP_SDES_PHONE:
803       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
804               GST_RTCP_SDES_PHONE));
805       break;
806     case PROP_SDES_LOCATION:
807       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
808               GST_RTCP_SDES_LOC));
809       break;
810     case PROP_SDES_TOOL:
811       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
812               GST_RTCP_SDES_TOOL));
813       break;
814     case PROP_SDES_NOTE:
815       g_value_take_string (value, rtp_session_get_sdes_string (priv->session,
816               GST_RTCP_SDES_NOTE));
817       break;
818     case PROP_NUM_SOURCES:
819       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
820       break;
821     case PROP_NUM_ACTIVE_SOURCES:
822       g_value_set_uint (value,
823           rtp_session_get_num_active_sources (priv->session));
824       break;
825     default:
826       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
827       break;
828   }
829 }
830
831 static guint64
832 get_current_ntp_ns_time (GstRtpSession * rtpsession)
833 {
834   guint64 ntpnstime;
835   GstClock *clock;
836   GstClockTime base_time, ntpnsbase;
837
838   GST_OBJECT_LOCK (rtpsession);
839   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
840     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
841     ntpnsbase = rtpsession->priv->ntpnsbase;
842     gst_object_ref (clock);
843     GST_OBJECT_UNLOCK (rtpsession);
844
845     /* get current NTP time */
846     ntpnstime = gst_clock_get_time (clock);
847     /* convert to running time */
848     ntpnstime -= base_time;
849     /* add NTP base offset */
850     ntpnstime += ntpnsbase;
851
852     gst_object_unref (clock);
853   } else {
854     GST_OBJECT_UNLOCK (rtpsession);
855     ntpnstime = -1;
856   }
857
858   return ntpnstime;
859 }
860
861 static void
862 rtcp_thread (GstRtpSession * rtpsession)
863 {
864   GstClockID id;
865   GstClockTime current_time;
866   GstClockTime next_timeout;
867   guint64 ntpnstime;
868
869   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
870
871   GST_RTP_SESSION_LOCK (rtpsession);
872
873   current_time = gst_clock_get_time (rtpsession->priv->sysclock);
874
875   while (!rtpsession->priv->stop_thread) {
876     GstClockReturn res;
877
878     /* get initial estimate */
879     next_timeout =
880         rtp_session_next_timeout (rtpsession->priv->session, current_time);
881
882     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
883         GST_TIME_ARGS (next_timeout));
884
885     /* leave if no more timeouts, the session ended */
886     if (next_timeout == GST_CLOCK_TIME_NONE)
887       break;
888
889     id = rtpsession->priv->id =
890         gst_clock_new_single_shot_id (rtpsession->priv->sysclock, next_timeout);
891     GST_RTP_SESSION_UNLOCK (rtpsession);
892
893     res = gst_clock_id_wait (id, NULL);
894
895     GST_RTP_SESSION_LOCK (rtpsession);
896     gst_clock_id_unref (id);
897     rtpsession->priv->id = NULL;
898
899     if (rtpsession->priv->stop_thread)
900       break;
901
902     /* update current time */
903     current_time = gst_clock_get_time (rtpsession->priv->sysclock);
904
905     /* get current NTP time */
906     ntpnstime = get_current_ntp_ns_time (rtpsession);
907
908     /* we get unlocked because we need to perform reconsideration, don't perform
909      * the timeout but get a new reporting estimate. */
910     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
911         res, GST_TIME_ARGS (current_time));
912
913     /* perform actions, we ignore result. Release lock because it might push. */
914     GST_RTP_SESSION_UNLOCK (rtpsession);
915     rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime);
916     GST_RTP_SESSION_LOCK (rtpsession);
917   }
918   /* mark the thread as stopped now */
919   rtpsession->priv->thread_stopped = TRUE;
920   GST_RTP_SESSION_UNLOCK (rtpsession);
921
922   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
923 }
924
925 static gboolean
926 start_rtcp_thread (GstRtpSession * rtpsession)
927 {
928   GError *error = NULL;
929   gboolean res;
930
931   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
932
933   GST_RTP_SESSION_LOCK (rtpsession);
934   rtpsession->priv->stop_thread = FALSE;
935   if (rtpsession->priv->thread_stopped) {
936     /* only create a new thread if the old one was stopped. Otherwise we can
937      * just reuse the currently running one. */
938     rtpsession->priv->thread =
939         g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
940     rtpsession->priv->thread_stopped = FALSE;
941   }
942   GST_RTP_SESSION_UNLOCK (rtpsession);
943
944   if (error != NULL) {
945     res = FALSE;
946     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
947     g_error_free (error);
948   } else {
949     res = TRUE;
950   }
951   return res;
952 }
953
954 static void
955 stop_rtcp_thread (GstRtpSession * rtpsession)
956 {
957   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
958
959   GST_RTP_SESSION_LOCK (rtpsession);
960   rtpsession->priv->stop_thread = TRUE;
961   if (rtpsession->priv->id)
962     gst_clock_id_unschedule (rtpsession->priv->id);
963   GST_RTP_SESSION_UNLOCK (rtpsession);
964 }
965
966 static void
967 join_rtcp_thread (GstRtpSession * rtpsession)
968 {
969   GST_RTP_SESSION_LOCK (rtpsession);
970   /* don't try to join when we have no thread */
971   if (rtpsession->priv->thread != NULL) {
972     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
973     GST_RTP_SESSION_UNLOCK (rtpsession);
974
975     g_thread_join (rtpsession->priv->thread);
976
977     GST_RTP_SESSION_LOCK (rtpsession);
978     /* after the join, take the lock and clear the thread structure. The caller
979      * is supposed to not concurrently call start and join. */
980     rtpsession->priv->thread = NULL;
981   }
982   GST_RTP_SESSION_UNLOCK (rtpsession);
983 }
984
985 static GstStateChangeReturn
986 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
987 {
988   GstStateChangeReturn res;
989   GstRtpSession *rtpsession;
990   GstRtpSessionPrivate *priv;
991
992   rtpsession = GST_RTP_SESSION (element);
993   priv = rtpsession->priv;
994
995   switch (transition) {
996     case GST_STATE_CHANGE_NULL_TO_READY:
997       break;
998     case GST_STATE_CHANGE_READY_TO_PAUSED:
999       break;
1000     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1001       break;
1002     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1003     case GST_STATE_CHANGE_PAUSED_TO_READY:
1004       /* no need to join yet, we might want to continue later. Also, the
1005        * dataflow could block downstream so that a join could just block
1006        * forever. */
1007       stop_rtcp_thread (rtpsession);
1008       break;
1009     default:
1010       break;
1011   }
1012
1013   res = parent_class->change_state (element, transition);
1014
1015   switch (transition) {
1016     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1017       if (!start_rtcp_thread (rtpsession))
1018         goto failed_thread;
1019       break;
1020     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1021       break;
1022     case GST_STATE_CHANGE_PAUSED_TO_READY:
1023       /* downstream is now releasing the dataflow and we can join. */
1024       join_rtcp_thread (rtpsession);
1025       break;
1026     case GST_STATE_CHANGE_READY_TO_NULL:
1027       break;
1028     default:
1029       break;
1030   }
1031   return res;
1032
1033   /* ERRORS */
1034 failed_thread:
1035   {
1036     return GST_STATE_CHANGE_FAILURE;
1037   }
1038 }
1039
1040 static gboolean
1041 return_true (gpointer key, gpointer value, gpointer user_data)
1042 {
1043   return TRUE;
1044 }
1045
1046 static void
1047 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1048 {
1049   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1050 }
1051
1052 /* called when the session manager has an RTP packet ready for further
1053  * processing */
1054 static GstFlowReturn
1055 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1056     GstBuffer * buffer, gpointer user_data)
1057 {
1058   GstFlowReturn result;
1059   GstRtpSession *rtpsession;
1060   GstRtpSessionPrivate *priv;
1061
1062   rtpsession = GST_RTP_SESSION (user_data);
1063   priv = rtpsession->priv;
1064
1065   if (rtpsession->recv_rtp_src) {
1066     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1067     result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
1068   } else {
1069     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1070     gst_buffer_unref (buffer);
1071     result = GST_FLOW_OK;
1072   }
1073   return result;
1074 }
1075
1076 /* called when the session manager has an RTP packet ready for further
1077  * sending */
1078 static GstFlowReturn
1079 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1080     GstBuffer * buffer, gpointer user_data)
1081 {
1082   GstFlowReturn result;
1083   GstRtpSession *rtpsession;
1084   GstRtpSessionPrivate *priv;
1085
1086   rtpsession = GST_RTP_SESSION (user_data);
1087   priv = rtpsession->priv;
1088
1089   GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1090
1091   if (rtpsession->send_rtp_src) {
1092     result = gst_pad_push (rtpsession->send_rtp_src, buffer);
1093   } else {
1094     gst_buffer_unref (buffer);
1095     result = GST_FLOW_OK;
1096   }
1097   return result;
1098 }
1099
1100 /* called when the session manager has an RTCP packet ready for further
1101  * sending. The eos flag is set when an EOS event should be sent downstream as
1102  * well. */
1103 static GstFlowReturn
1104 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1105     GstBuffer * buffer, gboolean eos, gpointer user_data)
1106 {
1107   GstFlowReturn result;
1108   GstRtpSession *rtpsession;
1109   GstRtpSessionPrivate *priv;
1110
1111   rtpsession = GST_RTP_SESSION (user_data);
1112   priv = rtpsession->priv;
1113
1114   if (rtpsession->send_rtcp_src) {
1115     GstCaps *caps;
1116
1117     /* set rtcp caps on output pad */
1118     if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) {
1119       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
1120       gst_pad_set_caps (rtpsession->send_rtcp_src, caps);
1121       gst_caps_unref (caps);
1122     }
1123     gst_buffer_set_caps (buffer, caps);
1124     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1125     result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
1126
1127     /* we have to send EOS after this packet */
1128     if (eos) {
1129       GST_LOG_OBJECT (rtpsession, "sending EOS");
1130       gst_pad_push_event (rtpsession->send_rtcp_src, gst_event_new_eos ());
1131     }
1132   } else {
1133     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1134     gst_buffer_unref (buffer);
1135     result = GST_FLOW_OK;
1136   }
1137   return result;
1138 }
1139
1140 /* called when the session manager has an SR RTCP packet ready for handling
1141  * inter stream synchronisation */
1142 static GstFlowReturn
1143 gst_rtp_session_sync_rtcp (RTPSession * sess,
1144     RTPSource * src, GstBuffer * buffer, gpointer user_data)
1145 {
1146   GstFlowReturn result;
1147   GstRtpSession *rtpsession;
1148   GstRtpSessionPrivate *priv;
1149
1150   rtpsession = GST_RTP_SESSION (user_data);
1151   priv = rtpsession->priv;
1152
1153   if (rtpsession->sync_src) {
1154     GstCaps *caps;
1155
1156     /* set rtcp caps on output pad */
1157     if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) {
1158       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
1159       gst_pad_set_caps (rtpsession->sync_src, caps);
1160       gst_caps_unref (caps);
1161     }
1162     gst_buffer_set_caps (buffer, caps);
1163     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1164     result = gst_pad_push (rtpsession->sync_src, buffer);
1165   } else {
1166     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1167     gst_buffer_unref (buffer);
1168     result = GST_FLOW_OK;
1169   }
1170   return result;
1171 }
1172
1173 static void
1174 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1175 {
1176   GstRtpSessionPrivate *priv;
1177   const GstStructure *s;
1178   gint payload;
1179
1180   priv = rtpsession->priv;
1181
1182   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1183
1184   s = gst_caps_get_structure (caps, 0);
1185   if (!gst_structure_get_int (s, "payload", &payload))
1186     return;
1187
1188   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1189     return;
1190
1191   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1192       gst_caps_ref (caps));
1193 }
1194
1195 /* called when the session manager needs the clock rate */
1196 static gint
1197 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1198     gpointer user_data)
1199 {
1200   gint ipayload, result = -1;
1201   GstRtpSession *rtpsession;
1202   GstRtpSessionPrivate *priv;
1203   GValue ret = { 0 };
1204   GValue args[2] = { {0}, {0} };
1205   GstCaps *caps;
1206   const GstStructure *s;
1207
1208   rtpsession = GST_RTP_SESSION_CAST (user_data);
1209   priv = rtpsession->priv;
1210
1211   GST_RTP_SESSION_LOCK (rtpsession);
1212   ipayload = payload;           /* make compiler happy */
1213   caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (ipayload));
1214   if (caps) {
1215     gst_caps_ref (caps);
1216     goto found;
1217   }
1218
1219   /* not found in the cache, try to get it with a signal */
1220   g_value_init (&args[0], GST_TYPE_ELEMENT);
1221   g_value_set_object (&args[0], rtpsession);
1222   g_value_init (&args[1], G_TYPE_UINT);
1223   g_value_set_uint (&args[1], payload);
1224
1225   g_value_init (&ret, GST_TYPE_CAPS);
1226   g_value_set_boxed (&ret, NULL);
1227
1228   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1229       &ret);
1230
1231   g_value_unset (&args[0]);
1232   g_value_unset (&args[1]);
1233   caps = (GstCaps *) g_value_dup_boxed (&ret);
1234   g_value_unset (&ret);
1235   if (!caps)
1236     goto no_caps;
1237
1238   gst_rtp_session_cache_caps (rtpsession, caps);
1239
1240 found:
1241   s = gst_caps_get_structure (caps, 0);
1242   if (!gst_structure_get_int (s, "clock-rate", &result))
1243     goto no_clock_rate;
1244
1245   gst_caps_unref (caps);
1246
1247   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1248
1249 done:
1250   GST_RTP_SESSION_UNLOCK (rtpsession);
1251
1252   return result;
1253
1254   /* ERRORS */
1255 no_caps:
1256   {
1257     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1258     goto done;
1259   }
1260 no_clock_rate:
1261   {
1262     gst_caps_unref (caps);
1263     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1264     goto done;
1265   }
1266 }
1267
1268 /* called when the session manager asks us to reconsider the timeout */
1269 static void
1270 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1271 {
1272   GstRtpSession *rtpsession;
1273
1274   rtpsession = GST_RTP_SESSION_CAST (user_data);
1275
1276   GST_RTP_SESSION_LOCK (rtpsession);
1277   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1278   if (rtpsession->priv->id)
1279     gst_clock_id_unschedule (rtpsession->priv->id);
1280   GST_RTP_SESSION_UNLOCK (rtpsession);
1281 }
1282
1283 static gboolean
1284 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
1285 {
1286   GstRtpSession *rtpsession;
1287   GstRtpSessionPrivate *priv;
1288   gboolean ret = FALSE;
1289
1290   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1291   priv = rtpsession->priv;
1292
1293   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1294       GST_EVENT_TYPE_NAME (event));
1295
1296   switch (GST_EVENT_TYPE (event)) {
1297     case GST_EVENT_FLUSH_STOP:
1298       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1299       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1300       break;
1301     case GST_EVENT_NEWSEGMENT:
1302     {
1303       gboolean update;
1304       gdouble rate, arate;
1305       GstFormat format;
1306       gint64 start, stop, time;
1307       GstSegment *segment;
1308
1309       segment = &rtpsession->recv_rtp_seg;
1310
1311       /* the newsegment event is needed to convert the RTP timestamp to
1312        * running_time, which is needed to generate a mapping from RTP to NTP
1313        * timestamps in SR reports */
1314       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1315           &start, &stop, &time);
1316
1317       GST_DEBUG_OBJECT (rtpsession,
1318           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1319           "format GST_FORMAT_TIME, "
1320           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1321           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1322           update, rate, arate, GST_TIME_ARGS (segment->start),
1323           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1324           GST_TIME_ARGS (segment->accum));
1325
1326       gst_segment_set_newsegment_full (segment, update, rate,
1327           arate, format, start, stop, time);
1328
1329       /* push event forward */
1330       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1331       break;
1332     }
1333     default:
1334       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1335       break;
1336   }
1337   gst_object_unref (rtpsession);
1338
1339   return ret;
1340
1341 }
1342 static GList *
1343 gst_rtp_session_internal_links (GstPad * pad)
1344 {
1345   GstRtpSession *rtpsession;
1346   GstRtpSessionPrivate *priv;
1347   GList *res = NULL;
1348
1349   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1350   priv = rtpsession->priv;
1351
1352   if (pad == rtpsession->recv_rtp_src) {
1353     res = g_list_prepend (res, rtpsession->recv_rtp_sink);
1354   } else if (pad == rtpsession->recv_rtp_sink) {
1355     res = g_list_prepend (res, rtpsession->recv_rtp_src);
1356   } else if (pad == rtpsession->send_rtp_src) {
1357     res = g_list_prepend (res, rtpsession->send_rtp_sink);
1358   } else if (pad == rtpsession->send_rtp_sink) {
1359     res = g_list_prepend (res, rtpsession->send_rtp_src);
1360   }
1361
1362   gst_object_unref (rtpsession);
1363
1364   return res;
1365 }
1366
1367 static gboolean
1368 gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps)
1369 {
1370   GstRtpSession *rtpsession;
1371   GstRtpSessionPrivate *priv;
1372
1373   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1374   priv = rtpsession->priv;
1375
1376   GST_RTP_SESSION_LOCK (rtpsession);
1377   gst_rtp_session_cache_caps (rtpsession, caps);
1378   GST_RTP_SESSION_UNLOCK (rtpsession);
1379
1380   gst_object_unref (rtpsession);
1381
1382   return TRUE;
1383 }
1384
1385 /* receive a packet from a sender, send it to the RTP session manager and
1386  * forward the packet on the rtp_src pad
1387  */
1388 static GstFlowReturn
1389 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
1390 {
1391   GstRtpSession *rtpsession;
1392   GstRtpSessionPrivate *priv;
1393   GstFlowReturn ret;
1394   GstClockTime current_time;
1395   guint64 ntpnstime;
1396   GstClockTime timestamp;
1397
1398   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1399   priv = rtpsession->priv;
1400
1401   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1402
1403   /* get NTP time when this packet was captured, this depends on the timestamp. */
1404   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1405   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1406     /* convert to running time using the segment values */
1407     ntpnstime =
1408         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1409         timestamp);
1410     /* add constant to convert running time to NTP time */
1411     ntpnstime += priv->ntpnsbase;
1412   } else {
1413     ntpnstime = get_current_ntp_ns_time (rtpsession);
1414   }
1415
1416   current_time = gst_clock_get_time (priv->sysclock);
1417   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1418       ntpnstime);
1419   if (ret != GST_FLOW_OK)
1420     goto push_error;
1421
1422
1423 done:
1424   gst_object_unref (rtpsession);
1425
1426   return ret;
1427
1428   /* ERRORS */
1429 push_error:
1430   {
1431     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1432         gst_flow_get_name (ret));
1433     goto done;
1434   }
1435 }
1436
1437 static gboolean
1438 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
1439 {
1440   GstRtpSession *rtpsession;
1441   GstRtpSessionPrivate *priv;
1442   gboolean ret = FALSE;
1443
1444   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1445   priv = rtpsession->priv;
1446
1447   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1448       GST_EVENT_TYPE_NAME (event));
1449
1450   switch (GST_EVENT_TYPE (event)) {
1451     default:
1452       if (rtpsession->send_rtcp_src) {
1453         gst_event_ref (event);
1454         ret = gst_pad_push_event (rtpsession->send_rtcp_src, event);
1455       }
1456       ret = gst_pad_push_event (rtpsession->sync_src, event);
1457       break;
1458   }
1459   gst_object_unref (rtpsession);
1460
1461   return ret;
1462 }
1463
1464 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1465  * forward the SR packets to the sync_src pad.
1466  */
1467 static GstFlowReturn
1468 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
1469 {
1470   GstRtpSession *rtpsession;
1471   GstRtpSessionPrivate *priv;
1472   GstClockTime current_time;
1473   GstFlowReturn ret;
1474
1475   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1476   priv = rtpsession->priv;
1477
1478   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
1479
1480   current_time = gst_clock_get_time (priv->sysclock);
1481   ret = rtp_session_process_rtcp (priv->session, buffer, current_time);
1482
1483   gst_object_unref (rtpsession);
1484
1485   return GST_FLOW_OK;
1486 }
1487
1488 static gboolean
1489 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstQuery * query)
1490 {
1491   GstRtpSession *rtpsession;
1492   GstRtpSessionPrivate *priv;
1493   gboolean ret = FALSE;
1494
1495   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1496   priv = rtpsession->priv;
1497
1498   GST_DEBUG_OBJECT (rtpsession, "received QUERY");
1499
1500   switch (GST_QUERY_TYPE (query)) {
1501     case GST_QUERY_LATENCY:
1502       ret = TRUE;
1503       /* use the defaults for the latency query. */
1504       gst_query_set_latency (query, FALSE, 0, -1);
1505       break;
1506     default:
1507       /* other queries simply fail for now */
1508       break;
1509   }
1510
1511   gst_object_unref (rtpsession);
1512
1513   return ret;
1514 }
1515
1516 static gboolean
1517 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
1518 {
1519   GstRtpSession *rtpsession;
1520   GstRtpSessionPrivate *priv;
1521   gboolean ret = FALSE;
1522
1523   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1524   priv = rtpsession->priv;
1525
1526   GST_DEBUG_OBJECT (rtpsession, "received event");
1527
1528   switch (GST_EVENT_TYPE (event)) {
1529     case GST_EVENT_FLUSH_STOP:
1530       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
1531       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1532       break;
1533     case GST_EVENT_NEWSEGMENT:{
1534       gboolean update;
1535       gdouble rate, arate;
1536       GstFormat format;
1537       gint64 start, stop, time;
1538       GstSegment *segment;
1539
1540       segment = &rtpsession->send_rtp_seg;
1541
1542       /* the newsegment event is needed to convert the RTP timestamp to
1543        * running_time, which is needed to generate a mapping from RTP to NTP
1544        * timestamps in SR reports */
1545       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1546           &start, &stop, &time);
1547
1548       GST_DEBUG_OBJECT (rtpsession,
1549           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1550           "format GST_FORMAT_TIME, "
1551           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1552           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1553           update, rate, arate, GST_TIME_ARGS (segment->start),
1554           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1555           GST_TIME_ARGS (segment->accum));
1556
1557       gst_segment_set_newsegment_full (segment, update, rate,
1558           arate, format, start, stop, time);
1559
1560       /* push event forward */
1561       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1562       break;
1563     }
1564     case GST_EVENT_EOS:{
1565       GstClockTime current_time;
1566
1567       /* push downstream FIXME, we are not supposed to leave the session just
1568        * because we stop sending. */
1569       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1570       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
1571       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
1572       rtp_session_send_bye (rtpsession->priv->session, "End of stream",
1573           current_time);
1574       break;
1575     }
1576     default:
1577       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1578       break;
1579   }
1580   gst_object_unref (rtpsession);
1581
1582   return ret;
1583 }
1584
1585 static GstCaps *
1586 gst_rtp_session_getcaps_send_rtp (GstPad * pad)
1587 {
1588   GstRtpSession *rtpsession;
1589   GstRtpSessionPrivate *priv;
1590   GstCaps *result;
1591   GstStructure *s1, *s2;
1592
1593   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1594   priv = rtpsession->priv;
1595
1596   /* we can basically accept anything but we prefer to receive packets with our
1597    * internal SSRC so that we don't have to patch it. Create a structure with
1598    * the SSRC and another one without. */
1599   s1 = gst_structure_new ("application/x-rtp",
1600       "ssrc", G_TYPE_UINT, priv->session->source->ssrc, NULL);
1601   s2 = gst_structure_new ("application/x-rtp", NULL);
1602
1603   result = gst_caps_new_full (s1, s2, NULL);
1604
1605   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
1606
1607   gst_object_unref (rtpsession);
1608
1609   return result;
1610 }
1611
1612 /* Recieve an RTP packet to be send to the receivers, send to RTP session
1613  * manager and forward to send_rtp_src.
1614  */
1615 static GstFlowReturn
1616 gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
1617 {
1618   GstRtpSession *rtpsession;
1619   GstRtpSessionPrivate *priv;
1620   GstFlowReturn ret;
1621   GstClockTime timestamp;
1622   GstClockTime current_time;
1623   guint64 ntpnstime;
1624
1625   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1626   priv = rtpsession->priv;
1627
1628   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1629
1630   /* get NTP time when this packet was captured, this depends on the timestamp. */
1631   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1632   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1633     /* convert to running time using the segment start value. */
1634     ntpnstime =
1635         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
1636         timestamp);
1637     /* convert to NTP time by adding the NTP base */
1638     ntpnstime += priv->ntpnsbase;
1639   } else {
1640     /* no timestamp, we could take the current running_time and convert it to
1641      * NTP time. */
1642     ntpnstime = -1;
1643   }
1644
1645   current_time = gst_clock_get_time (priv->sysclock);
1646   ret = rtp_session_send_rtp (priv->session, buffer, current_time, ntpnstime);
1647   if (ret != GST_FLOW_OK)
1648     goto push_error;
1649
1650 done:
1651   gst_object_unref (rtpsession);
1652
1653   return ret;
1654
1655   /* ERRORS */
1656 push_error:
1657   {
1658     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1659         gst_flow_get_name (ret));
1660     goto done;
1661   }
1662 }
1663
1664 /* Create sinkpad to receive RTP packets from senders. This will also create a
1665  * srcpad for the RTP packets.
1666  */
1667 static GstPad *
1668 create_recv_rtp_sink (GstRtpSession * rtpsession)
1669 {
1670   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
1671
1672   rtpsession->recv_rtp_sink =
1673       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
1674       "recv_rtp_sink");
1675   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
1676       gst_rtp_session_chain_recv_rtp);
1677   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
1678       (GstPadEventFunction) gst_rtp_session_event_recv_rtp_sink);
1679   gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
1680       gst_rtp_session_sink_setcaps);
1681   gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink,
1682       gst_rtp_session_internal_links);
1683   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
1684   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1685       rtpsession->recv_rtp_sink);
1686
1687   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
1688   rtpsession->recv_rtp_src =
1689       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
1690       "recv_rtp_src");
1691   gst_pad_set_internal_link_function (rtpsession->recv_rtp_src,
1692       gst_rtp_session_internal_links);
1693   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
1694   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
1695   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
1696
1697   return rtpsession->recv_rtp_sink;
1698 }
1699
1700 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
1701  * sync_src pad for the SR packets.
1702  */
1703 static GstPad *
1704 create_recv_rtcp_sink (GstRtpSession * rtpsession)
1705 {
1706   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
1707
1708   rtpsession->recv_rtcp_sink =
1709       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
1710       "recv_rtcp_sink");
1711   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
1712       gst_rtp_session_chain_recv_rtcp);
1713   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
1714       (GstPadEventFunction) gst_rtp_session_event_recv_rtcp_sink);
1715   gst_pad_set_internal_link_function (rtpsession->recv_rtcp_sink,
1716       gst_rtp_session_internal_links);
1717   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
1718   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1719       rtpsession->recv_rtcp_sink);
1720
1721   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
1722   rtpsession->sync_src =
1723       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
1724       "sync_src");
1725   gst_pad_set_internal_link_function (rtpsession->sync_src,
1726       gst_rtp_session_internal_links);
1727   gst_pad_use_fixed_caps (rtpsession->sync_src);
1728   gst_pad_set_active (rtpsession->sync_src, TRUE);
1729   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
1730
1731   return rtpsession->recv_rtcp_sink;
1732 }
1733
1734 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
1735  * send_rtp_src pad.
1736  */
1737 static GstPad *
1738 create_send_rtp_sink (GstRtpSession * rtpsession)
1739 {
1740   GST_DEBUG_OBJECT (rtpsession, "creating pad");
1741
1742   rtpsession->send_rtp_sink =
1743       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
1744       "send_rtp_sink");
1745   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
1746       gst_rtp_session_chain_send_rtp);
1747   gst_pad_set_getcaps_function (rtpsession->send_rtp_sink,
1748       gst_rtp_session_getcaps_send_rtp);
1749   gst_pad_set_event_function (rtpsession->send_rtp_sink,
1750       (GstPadEventFunction) gst_rtp_session_event_send_rtp_sink);
1751   gst_pad_set_internal_link_function (rtpsession->send_rtp_sink,
1752       gst_rtp_session_internal_links);
1753   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
1754   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1755       rtpsession->send_rtp_sink);
1756
1757   rtpsession->send_rtp_src =
1758       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
1759       "send_rtp_src");
1760   gst_pad_set_internal_link_function (rtpsession->send_rtp_src,
1761       gst_rtp_session_internal_links);
1762   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
1763   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
1764
1765   return rtpsession->send_rtp_sink;
1766 }
1767
1768 /* Create a srcpad with the RTCP packets to send out.
1769  * This pad will be driven by the RTP session manager when it wants to send out
1770  * RTCP packets.
1771  */
1772 static GstPad *
1773 create_send_rtcp_src (GstRtpSession * rtpsession)
1774 {
1775   GST_DEBUG_OBJECT (rtpsession, "creating pad");
1776
1777   rtpsession->send_rtcp_src =
1778       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
1779       "send_rtcp_src");
1780   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
1781   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
1782   gst_pad_set_internal_link_function (rtpsession->send_rtcp_src,
1783       gst_rtp_session_internal_links);
1784   gst_pad_set_query_function (rtpsession->send_rtcp_src,
1785       gst_rtp_session_query_send_rtcp_src);
1786   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1787       rtpsession->send_rtcp_src);
1788
1789   return rtpsession->send_rtcp_src;
1790 }
1791
1792 static GstPad *
1793 gst_rtp_session_request_new_pad (GstElement * element,
1794     GstPadTemplate * templ, const gchar * name)
1795 {
1796   GstRtpSession *rtpsession;
1797   GstElementClass *klass;
1798   GstPad *result;
1799
1800   g_return_val_if_fail (templ != NULL, NULL);
1801   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
1802
1803   rtpsession = GST_RTP_SESSION (element);
1804   klass = GST_ELEMENT_GET_CLASS (element);
1805
1806   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1807
1808   GST_RTP_SESSION_LOCK (rtpsession);
1809
1810   /* figure out the template */
1811   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
1812     if (rtpsession->recv_rtp_sink != NULL)
1813       goto exists;
1814
1815     result = create_recv_rtp_sink (rtpsession);
1816   } else if (templ == gst_element_class_get_pad_template (klass,
1817           "recv_rtcp_sink")) {
1818     if (rtpsession->recv_rtcp_sink != NULL)
1819       goto exists;
1820
1821     result = create_recv_rtcp_sink (rtpsession);
1822   } else if (templ == gst_element_class_get_pad_template (klass,
1823           "send_rtp_sink")) {
1824     if (rtpsession->send_rtp_sink != NULL)
1825       goto exists;
1826
1827     result = create_send_rtp_sink (rtpsession);
1828   } else if (templ == gst_element_class_get_pad_template (klass,
1829           "send_rtcp_src")) {
1830     if (rtpsession->send_rtcp_src != NULL)
1831       goto exists;
1832
1833     result = create_send_rtcp_src (rtpsession);
1834   } else
1835     goto wrong_template;
1836
1837   GST_RTP_SESSION_UNLOCK (rtpsession);
1838
1839   return result;
1840
1841   /* ERRORS */
1842 wrong_template:
1843   {
1844     GST_RTP_SESSION_UNLOCK (rtpsession);
1845     g_warning ("gstrtpsession: this is not our template");
1846     return NULL;
1847   }
1848 exists:
1849   {
1850     GST_RTP_SESSION_UNLOCK (rtpsession);
1851     g_warning ("gstrtpsession: pad already requested");
1852     return NULL;
1853   }
1854 }
1855
1856 static void
1857 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
1858 {
1859 }
1860
1861 void
1862 gst_rtp_session_set_ssrc (GstRtpSession * sess, guint32 ssrc)
1863 {
1864   rtp_session_set_internal_ssrc (sess->priv->session, ssrc);
1865 }