gst/rtpmanager/gstrtpjitterbuffer.c: Only peek at the tail element instead of popping...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpsession
22  * @short_description: an RTP session manager
23  * @see_also: gstrtpjitterbuffer, gstrtpbin, gstrtpptdemux, gstrtpssrcdemux
24  *
25  * <refsect2>
26  * <para>
27  * The RTP session manager models one participant with a unique SSRC in an RTP
28  * session. This session can be used to send and receive RTP and RTCP packets.
29  * Based on what REQUEST pads are requested from the session manager, specific
30  * functionality can be activated.
31  * </para>
32  * <para>
33  * The session manager currently implements RFC 3550 including:
34  * <itemizedlist>
35  *   <listitem>
36  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
37  *   </listitem>
38  *   <listitem>
39  *     <para>Maintainance of the SSRC participant database.</para>
40  *   </listitem>
41  *   <listitem>
42  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
43  *   </listitem>
44  *   <listitem>
45  *     <para>Scheduling of RR/SR RTCP packets.</para>
46  *   </listitem>
47  * </itemizedlist>
48  * </para>
49  * <para>
50  * The gstrtpsession will not demux packets based on SSRC or payload type, nor will
51  * it correct for packet reordering and jitter. Use gstrtpssrcdemux, gstrtpptdemux and
52  * gstrtpjitterbuffer in addition to gstrtpsession to perform these tasks. It is
53  * usually a good idea to use gstrtpbin, which combines all these features in one
54  * element.
55  * </para>
56  * <para>
57  * To use gstrtpsession as an RTP receiver, request a recv_rtp_sink pad, which will
58  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
59  * will be processed in the session and after being validated forwarded on the
60  * recv_rtp_src pad.
61  * </para>
62  * <para>
63  * To also use gstrtpsession as an RTCP receiver, request a recv_rtcp_sink pad,
64  * which will automatically create a sync_src pad. Packets received on the RTCP
65  * pad will be used by the session manager to update the stats and database of
66  * the other participants. SR packets will be forwarded on the sync_src pad
67  * so that they can be used to perform inter-stream synchronisation when needed.
68  * </para>
69  * <para>
70  * If you want the session manager to generate and send RTCP packets, request
71  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
72  * that should be sent to all participants in the session.
73  * </para>
74  * <para>
75  * To use gstrtpsession as a sender, request a send_rtp_sink pad, which will
76  * automatically create a send_rtp_src pad. The session manager will modify the
77  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
78  * send_rtp_src pad after updating its internal state.
79  * </para>
80  * <para>
81  * The session manager needs the clock-rate of the payload types it is handling
82  * and will signal the GstRtpSession::request-pt-map signal when it needs such a
83  * mapping. One can clear the cached values with the GstRtpSession::clear-pt-map
84  * signal.
85  * </para>
86  * <title>Example pipelines</title>
87  * <para>
88  * <programlisting>
89  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
90  * </programlisting>
91  * Receive theora RTP packets from port 5000 and send them to the depayloader,
92  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
93  * configured based on some negotiation process such as RTSP for this pipeline
94  * to work correctly.
95  * </para>
96  * <para>
97  * <programlisting>
98  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink gstrtpsession name=session \
99  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
100  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
101  * </programlisting>
102  * Receive theora RTP packets from port 5000 and send them to the depayloader,
103  * decoder and display. Receive RTCP packets from port 5001 and process them in
104  * the session manager.
105  * Note that the application/x-rtp caps on udpsrc should be
106  * configured based on some negotiation process such as RTSP for this pipeline
107  * to work correctly.
108  * </para>
109  * <para>
110  * <programlisting>
111  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession .send_rtp_src ! udpsink port=5000
112  * </programlisting>
113  * Send theora RTP packets through the session manager and out on UDP port 5000.
114  * </para>
115  * <para>
116  * <programlisting>
117  * gst-launch videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink gstrtpsession name=session .send_rtp_src \
118  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
119  * </programlisting>
120  * Send theora RTP packets through the session manager and out on UDP port 5000.
121  * Send RTCP packets on port 5001. Note that this pipeline will not preroll
122  * correctly because the second udpsink will not preroll correctly (no RTCP
123  * packets are sent in the PAUSED state). Applications should manually set and
124  * keep (see #gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
125  * </para>
126  * </refsect2>
127  *
128  * Last reviewed on 2007-05-28 (0.10.5)
129  */
130
131 #ifdef HAVE_CONFIG_H
132 #include "config.h"
133 #endif
134
135 #include <gst/rtp/gstrtpbuffer.h>
136
137 #include "gstrtpbin-marshal.h"
138 #include "gstrtpsession.h"
139 #include "rtpsession.h"
140
141 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
142 #define GST_CAT_DEFAULT gst_rtp_session_debug
143
144 /* elementfactory information */
145 static const GstElementDetails rtpsession_details =
146 GST_ELEMENT_DETAILS ("RTP Session",
147     "Filter/Network/RTP",
148     "Implement an RTP session",
149     "Wim Taymans <wim@fluendo.com>");
150
151 /* sink pads */
152 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
153 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
154     GST_PAD_SINK,
155     GST_PAD_REQUEST,
156     GST_STATIC_CAPS ("application/x-rtp")
157     );
158
159 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
160 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
161     GST_PAD_SINK,
162     GST_PAD_REQUEST,
163     GST_STATIC_CAPS ("application/x-rtcp")
164     );
165
166 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
167 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
168     GST_PAD_SINK,
169     GST_PAD_REQUEST,
170     GST_STATIC_CAPS ("application/x-rtp")
171     );
172
173 /* src pads */
174 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
175 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
176     GST_PAD_SRC,
177     GST_PAD_SOMETIMES,
178     GST_STATIC_CAPS ("application/x-rtp")
179     );
180
181 static GstStaticPadTemplate rtpsession_sync_src_template =
182 GST_STATIC_PAD_TEMPLATE ("sync_src",
183     GST_PAD_SRC,
184     GST_PAD_SOMETIMES,
185     GST_STATIC_CAPS ("application/x-rtcp")
186     );
187
188 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
189 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
190     GST_PAD_SRC,
191     GST_PAD_SOMETIMES,
192     GST_STATIC_CAPS ("application/x-rtp")
193     );
194
195 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
196 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
197     GST_PAD_SRC,
198     GST_PAD_REQUEST,
199     GST_STATIC_CAPS ("application/x-rtcp")
200     );
201
202 /* signals and args */
203 enum
204 {
205   SIGNAL_REQUEST_PT_MAP,
206   SIGNAL_CLEAR_PT_MAP,
207
208   SIGNAL_ON_NEW_SSRC,
209   SIGNAL_ON_SSRC_COLLISION,
210   SIGNAL_ON_SSRC_VALIDATED,
211   SIGNAL_ON_SSRC_ACTIVE,
212   SIGNAL_ON_BYE_SSRC,
213   SIGNAL_ON_BYE_TIMEOUT,
214   SIGNAL_ON_TIMEOUT,
215   LAST_SIGNAL
216 };
217
218 #define DEFAULT_NTP_NS_BASE 0
219
220 enum
221 {
222   PROP_0,
223   PROP_NTP_NS_BASE
224 };
225
226 #define GST_RTP_SESSION_GET_PRIVATE(obj)  \
227            (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
228
229 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->priv->lock)
230 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->priv->lock)
231
232 struct _GstRtpSessionPrivate
233 {
234   GMutex *lock;
235   RTPSession *session;
236
237   /* thread for sending out RTCP */
238   GstClockID id;
239   gboolean stop_thread;
240   GThread *thread;
241
242   /* caps mapping */
243   GHashTable *ptmap;
244
245   /* NTP base time */
246   guint64 ntpnsbase;
247 };
248
249 /* callbacks to handle actions from the session manager */
250 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
251     RTPSource * src, GstBuffer * buffer, gpointer user_data);
252 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
253     RTPSource * src, GstBuffer * buffer, gpointer user_data);
254 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
255     RTPSource * src, GstBuffer * buffer, gpointer user_data);
256 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
257     RTPSource * src, GstBuffer * buffer, gpointer user_data);
258 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
259     gpointer user_data);
260 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
261
262 static RTPSessionCallbacks callbacks = {
263   gst_rtp_session_process_rtp,
264   gst_rtp_session_send_rtp,
265   gst_rtp_session_send_rtcp,
266   gst_rtp_session_sync_rtcp,
267   gst_rtp_session_clock_rate,
268   gst_rtp_session_reconsider
269 };
270
271 /* GObject vmethods */
272 static void gst_rtp_session_finalize (GObject * object);
273 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
274     const GValue * value, GParamSpec * pspec);
275 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
276     GValue * value, GParamSpec * pspec);
277
278 /* GstElement vmethods */
279 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
280     GstStateChange transition);
281 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
282     GstPadTemplate * templ, const gchar * name);
283 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
284
285 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
286
287 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
288
289 static void
290 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
291 {
292   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
293       src->ssrc);
294 }
295
296 static void
297 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
298 {
299   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
300       src->ssrc);
301 }
302
303 static void
304 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
305 {
306   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
307       src->ssrc);
308 }
309
310 static void
311 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
312 {
313   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
314       src->ssrc);
315 }
316
317 static void
318 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
319 {
320   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
321       src->ssrc);
322 }
323
324 static void
325 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
326 {
327   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
328       src->ssrc);
329 }
330
331 static void
332 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
333 {
334   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
335       src->ssrc);
336 }
337
338 GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
339
340 static void
341 gst_rtp_session_base_init (gpointer klass)
342 {
343   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
344
345   /* sink pads */
346   gst_element_class_add_pad_template (element_class,
347       gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
348   gst_element_class_add_pad_template (element_class,
349       gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
350   gst_element_class_add_pad_template (element_class,
351       gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
352
353   /* src pads */
354   gst_element_class_add_pad_template (element_class,
355       gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
356   gst_element_class_add_pad_template (element_class,
357       gst_static_pad_template_get (&rtpsession_sync_src_template));
358   gst_element_class_add_pad_template (element_class,
359       gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
360   gst_element_class_add_pad_template (element_class,
361       gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
362
363   gst_element_class_set_details (element_class, &rtpsession_details);
364 }
365
366 static void
367 gst_rtp_session_class_init (GstRtpSessionClass * klass)
368 {
369   GObjectClass *gobject_class;
370   GstElementClass *gstelement_class;
371
372   gobject_class = (GObjectClass *) klass;
373   gstelement_class = (GstElementClass *) klass;
374
375   g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
376
377   gobject_class->finalize = gst_rtp_session_finalize;
378   gobject_class->set_property = gst_rtp_session_set_property;
379   gobject_class->get_property = gst_rtp_session_get_property;
380
381
382   /**
383    * GstRtpSession::request-pt-map:
384    * @sess: the object which received the signal
385    * @pt: the pt
386    *
387    * Request the payload type as #GstCaps for @pt.
388    */
389   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
390       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
391       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
392       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
393       G_TYPE_UINT);
394   /**
395    * GstRtpSession::clear-pt-map:
396    * @sess: the object which received the signal
397    *
398    * Clear the cached pt-maps requested with GstRtpSession::request-pt-map.
399    */
400   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
401       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
402       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
403       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
404
405   /**
406    * GstRtpSession::on-new-ssrc:
407    * @sess: the object which received the signal
408    * @ssrc: the SSRC 
409    *
410    * Notify of a new SSRC that entered @session.
411    */
412   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
413       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
414       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
415       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
416   /**
417    * GstRtpSession::on-ssrc_collision:
418    * @sess: the object which received the signal
419    * @ssrc: the SSRC 
420    *
421    * Notify when we have an SSRC collision
422    */
423   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
424       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
425       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
426           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
427       G_TYPE_NONE, 1, G_TYPE_UINT);
428   /**
429    * GstRtpSession::on-ssrc_validated:
430    * @sess: the object which received the signal
431    * @ssrc: the SSRC 
432    *
433    * Notify of a new SSRC that became validated.
434    */
435   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
436       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
437       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
438           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
439       G_TYPE_NONE, 1, G_TYPE_UINT);
440   /**
441    * GstRtpSession::on-ssrc_active:
442    * @sess: the object which received the signal
443    * @ssrc: the SSRC
444    *
445    * Notify of a SSRC that is active, i.e., sending RTCP.
446    */
447   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
448       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
449       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
450           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
451       G_TYPE_NONE, 1, G_TYPE_UINT);
452
453   /**
454    * GstRtpSession::on-bye-ssrc:
455    * @sess: the object which received the signal
456    * @ssrc: the SSRC 
457    *
458    * Notify of an SSRC that became inactive because of a BYE packet.
459    */
460   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
461       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
462       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
463       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
464   /**
465    * GstRtpSession::on-bye-timeout:
466    * @sess: the object which received the signal
467    * @ssrc: the SSRC 
468    *
469    * Notify of an SSRC that has timed out because of BYE
470    */
471   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
472       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
473       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
474       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
475   /**
476    * GstRtpSession::on-timeout:
477    * @sess: the object which received the signal
478    * @ssrc: the SSRC 
479    *
480    * Notify of an SSRC that has timed out
481    */
482   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
483       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
484       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
485       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
486
487   g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
488       g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
489           "The NTP base time corresponding to running_time 0", 0,
490           G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE));
491
492   gstelement_class->change_state =
493       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
494   gstelement_class->request_new_pad =
495       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
496   gstelement_class->release_pad =
497       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
498
499   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
500
501   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
502       "rtpsession", 0, "RTP Session");
503 }
504
505 static void
506 gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
507 {
508   rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
509   rtpsession->priv->lock = g_mutex_new ();
510   rtpsession->priv->session = rtp_session_new ();
511   /* configure callbacks */
512   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
513   /* configure signals */
514   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
515       (GCallback) on_new_ssrc, rtpsession);
516   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
517       (GCallback) on_ssrc_collision, rtpsession);
518   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
519       (GCallback) on_ssrc_validated, rtpsession);
520   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
521       (GCallback) on_ssrc_active, rtpsession);
522   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
523       (GCallback) on_bye_ssrc, rtpsession);
524   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
525       (GCallback) on_bye_timeout, rtpsession);
526   g_signal_connect (rtpsession->priv->session, "on-timeout",
527       (GCallback) on_timeout, rtpsession);
528   rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
529
530   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
531   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
532 }
533
534 static void
535 gst_rtp_session_finalize (GObject * object)
536 {
537   GstRtpSession *rtpsession;
538
539   rtpsession = GST_RTP_SESSION (object);
540
541   g_hash_table_destroy (rtpsession->priv->ptmap);
542   g_mutex_free (rtpsession->priv->lock);
543   g_object_unref (rtpsession->priv->session);
544
545   G_OBJECT_CLASS (parent_class)->finalize (object);
546 }
547
548 static void
549 gst_rtp_session_set_property (GObject * object, guint prop_id,
550     const GValue * value, GParamSpec * pspec)
551 {
552   GstRtpSession *rtpsession;
553
554   rtpsession = GST_RTP_SESSION (object);
555
556   switch (prop_id) {
557     case PROP_NTP_NS_BASE:
558       GST_OBJECT_LOCK (rtpsession);
559       rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
560       GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
561           GST_TIME_ARGS (rtpsession->priv->ntpnsbase));
562       GST_OBJECT_UNLOCK (rtpsession);
563       break;
564     default:
565       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
566       break;
567   }
568 }
569
570 static void
571 gst_rtp_session_get_property (GObject * object, guint prop_id,
572     GValue * value, GParamSpec * pspec)
573 {
574   GstRtpSession *rtpsession;
575
576   rtpsession = GST_RTP_SESSION (object);
577
578   switch (prop_id) {
579     case PROP_NTP_NS_BASE:
580       GST_OBJECT_LOCK (rtpsession);
581       g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
582       GST_OBJECT_UNLOCK (rtpsession);
583       break;
584     default:
585       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
586       break;
587   }
588 }
589
590 static guint64
591 get_current_ntp_ns_time (GstRtpSession * rtpsession)
592 {
593   guint64 ntpnstime;
594   GstClock *clock;
595   GstClockTime base_time, ntpnsbase;
596
597   GST_OBJECT_LOCK (rtpsession);
598   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
599     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
600     ntpnsbase = rtpsession->priv->ntpnsbase;
601     gst_object_ref (clock);
602     GST_OBJECT_UNLOCK (rtpsession);
603
604     /* get current NTP time */
605     ntpnstime = gst_clock_get_time (clock);
606     /* convert to running time */
607     ntpnstime -= base_time;
608     /* add NTP base offset */
609     ntpnstime += ntpnsbase;
610
611     gst_object_unref (clock);
612   } else {
613     GST_OBJECT_UNLOCK (rtpsession);
614     ntpnstime = -1;
615   }
616
617   return ntpnstime;
618 }
619
620 static void
621 rtcp_thread (GstRtpSession * rtpsession)
622 {
623   GstClock *sysclock;
624   GstClockID id;
625   GstClockTime current_time;
626   GstClockTime next_timeout;
627   guint64 ntpnstime;
628
629   /* for RTCP timeouts we use the system clock */
630   sysclock = gst_system_clock_obtain ();
631   if (sysclock == NULL)
632     goto no_sysclock;
633
634   current_time = gst_clock_get_time (sysclock);
635
636   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
637
638   GST_RTP_SESSION_LOCK (rtpsession);
639
640   while (!rtpsession->priv->stop_thread) {
641     GstClockReturn res;
642
643     /* get initial estimate */
644     next_timeout =
645         rtp_session_next_timeout (rtpsession->priv->session, current_time);
646
647     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
648         GST_TIME_ARGS (next_timeout));
649
650     /* leave if no more timeouts, the session ended */
651     if (next_timeout == GST_CLOCK_TIME_NONE)
652       break;
653
654     id = rtpsession->priv->id =
655         gst_clock_new_single_shot_id (sysclock, next_timeout);
656     GST_RTP_SESSION_UNLOCK (rtpsession);
657
658     res = gst_clock_id_wait (id, NULL);
659
660     GST_RTP_SESSION_LOCK (rtpsession);
661     gst_clock_id_unref (id);
662     rtpsession->priv->id = NULL;
663
664     if (rtpsession->priv->stop_thread)
665       break;
666
667     /* update current time */
668     current_time = gst_clock_get_time (sysclock);
669
670     /* get current NTP time */
671     ntpnstime = get_current_ntp_ns_time (rtpsession);
672
673     /* we get unlocked because we need to perform reconsideration, don't perform
674      * the timeout but get a new reporting estimate. */
675     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
676         res, GST_TIME_ARGS (current_time));
677
678     /* perform actions, we ignore result. Release lock because it might push. */
679     GST_RTP_SESSION_UNLOCK (rtpsession);
680     rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime);
681     GST_RTP_SESSION_LOCK (rtpsession);
682   }
683   GST_RTP_SESSION_UNLOCK (rtpsession);
684
685   gst_object_unref (sysclock);
686
687   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
688   return;
689
690   /* ERRORS */
691 no_sysclock:
692   {
693     GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL),
694         ("Could not get system clock"));
695     return;
696   }
697 }
698
699 static gboolean
700 start_rtcp_thread (GstRtpSession * rtpsession)
701 {
702   GError *error = NULL;
703   gboolean res;
704
705   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
706
707   GST_RTP_SESSION_LOCK (rtpsession);
708   rtpsession->priv->stop_thread = FALSE;
709   rtpsession->priv->thread =
710       g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
711   GST_RTP_SESSION_UNLOCK (rtpsession);
712
713   if (error != NULL) {
714     res = FALSE;
715     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
716     g_error_free (error);
717   } else {
718     res = TRUE;
719   }
720   return res;
721 }
722
723 static void
724 stop_rtcp_thread (GstRtpSession * rtpsession)
725 {
726   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
727
728   GST_RTP_SESSION_LOCK (rtpsession);
729   rtpsession->priv->stop_thread = TRUE;
730   if (rtpsession->priv->id)
731     gst_clock_id_unschedule (rtpsession->priv->id);
732   GST_RTP_SESSION_UNLOCK (rtpsession);
733
734   /* FIXME, can deadlock because the thread might be blocked in a push */
735   g_thread_join (rtpsession->priv->thread);
736 }
737
738 static GstStateChangeReturn
739 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
740 {
741   GstStateChangeReturn res;
742   GstRtpSession *rtpsession;
743   GstRtpSessionPrivate *priv;
744
745   rtpsession = GST_RTP_SESSION (element);
746   priv = rtpsession->priv;
747
748   switch (transition) {
749     case GST_STATE_CHANGE_NULL_TO_READY:
750       break;
751     case GST_STATE_CHANGE_READY_TO_PAUSED:
752       break;
753     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
754       break;
755     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
756       stop_rtcp_thread (rtpsession);
757       break;
758     default:
759       break;
760   }
761
762   res = parent_class->change_state (element, transition);
763
764   switch (transition) {
765     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
766       if (!start_rtcp_thread (rtpsession))
767         goto failed_thread;
768       break;
769     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
770       break;
771     case GST_STATE_CHANGE_PAUSED_TO_READY:
772       break;
773     case GST_STATE_CHANGE_READY_TO_NULL:
774       break;
775     default:
776       break;
777   }
778   return res;
779
780   /* ERRORS */
781 failed_thread:
782   {
783     return GST_STATE_CHANGE_FAILURE;
784   }
785 }
786
787 static void
788 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
789 {
790   /* FIXME, do something */
791 }
792
793 /* called when the session manager has an RTP packet ready for further
794  * processing */
795 static GstFlowReturn
796 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
797     GstBuffer * buffer, gpointer user_data)
798 {
799   GstFlowReturn result;
800   GstRtpSession *rtpsession;
801   GstRtpSessionPrivate *priv;
802
803   rtpsession = GST_RTP_SESSION (user_data);
804   priv = rtpsession->priv;
805
806   if (rtpsession->recv_rtp_src) {
807     GST_DEBUG_OBJECT (rtpsession, "pushing received RTP packet");
808     result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
809   } else {
810     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
811     gst_buffer_unref (buffer);
812     result = GST_FLOW_OK;
813   }
814   return result;
815 }
816
817 /* called when the session manager has an RTP packet ready for further
818  * sending */
819 static GstFlowReturn
820 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
821     GstBuffer * buffer, gpointer user_data)
822 {
823   GstFlowReturn result;
824   GstRtpSession *rtpsession;
825   GstRtpSessionPrivate *priv;
826
827   rtpsession = GST_RTP_SESSION (user_data);
828   priv = rtpsession->priv;
829
830   GST_DEBUG_OBJECT (rtpsession, "sending RTP packet");
831
832   if (rtpsession->send_rtp_src) {
833     result = gst_pad_push (rtpsession->send_rtp_src, buffer);
834   } else {
835     gst_buffer_unref (buffer);
836     result = GST_FLOW_OK;
837   }
838   return result;
839 }
840
841 /* called when the session manager has an RTCP packet ready for further
842  * sending */
843 static GstFlowReturn
844 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
845     GstBuffer * buffer, gpointer user_data)
846 {
847   GstFlowReturn result;
848   GstRtpSession *rtpsession;
849   GstRtpSessionPrivate *priv;
850
851   rtpsession = GST_RTP_SESSION (user_data);
852   priv = rtpsession->priv;
853
854   if (rtpsession->send_rtcp_src) {
855     GstCaps *caps;
856
857     /* set rtcp caps on output pad */
858     if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) {
859       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
860       gst_pad_set_caps (rtpsession->send_rtcp_src, caps);
861       gst_caps_unref (caps);
862     }
863     gst_buffer_set_caps (buffer, caps);
864     GST_DEBUG_OBJECT (rtpsession, "sending RTCP");
865     result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
866   } else {
867     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
868     gst_buffer_unref (buffer);
869     result = GST_FLOW_OK;
870   }
871   return result;
872 }
873
874 /* called when the session manager has an SR RTCP packet ready for handling
875  * inter stream synchronisation */
876 static GstFlowReturn
877 gst_rtp_session_sync_rtcp (RTPSession * sess,
878     RTPSource * src, GstBuffer * buffer, gpointer user_data)
879 {
880   GstFlowReturn result;
881   GstRtpSession *rtpsession;
882   GstRtpSessionPrivate *priv;
883
884   rtpsession = GST_RTP_SESSION (user_data);
885   priv = rtpsession->priv;
886
887   if (rtpsession->sync_src) {
888     GstCaps *caps;
889
890     /* set rtcp caps on output pad */
891     if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) {
892       caps = gst_caps_new_simple ("application/x-rtcp", NULL);
893       gst_pad_set_caps (rtpsession->sync_src, caps);
894       gst_caps_unref (caps);
895     }
896     gst_buffer_set_caps (buffer, caps);
897     GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP");
898     result = gst_pad_push (rtpsession->sync_src, buffer);
899   } else {
900     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
901     gst_buffer_unref (buffer);
902     result = GST_FLOW_OK;
903   }
904   return result;
905 }
906
907 static void
908 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
909 {
910   GstRtpSessionPrivate *priv;
911   const GstStructure *s;
912   gint payload;
913
914   priv = rtpsession->priv;
915
916   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
917
918   s = gst_caps_get_structure (caps, 0);
919   if (!gst_structure_get_int (s, "payload", &payload))
920     return;
921
922   caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload));
923   if (caps)
924     return;
925
926   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps);
927 }
928
929 /* called when the session manager needs the clock rate */
930 static gint
931 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
932     gpointer user_data)
933 {
934   gint ipayload, result = -1;
935   GstRtpSession *rtpsession;
936   GstRtpSessionPrivate *priv;
937   GValue ret = { 0 };
938   GValue args[2] = { {0}, {0} };
939   GstCaps *caps;
940   const GstStructure *s;
941
942   rtpsession = GST_RTP_SESSION_CAST (user_data);
943   priv = rtpsession->priv;
944
945   GST_RTP_SESSION_LOCK (rtpsession);
946   ipayload = payload;           /* make compiler happy */
947   caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (ipayload));
948   if (caps)
949     goto done;
950
951   g_value_init (&args[0], GST_TYPE_ELEMENT);
952   g_value_set_object (&args[0], rtpsession);
953   g_value_init (&args[1], G_TYPE_UINT);
954   g_value_set_uint (&args[1], payload);
955
956   g_value_init (&ret, GST_TYPE_CAPS);
957   g_value_set_boxed (&ret, NULL);
958
959   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
960       &ret);
961
962   caps = (GstCaps *) g_value_get_boxed (&ret);
963   if (!caps)
964     goto no_caps;
965
966   gst_rtp_session_cache_caps (rtpsession, caps);
967
968   s = gst_caps_get_structure (caps, 0);
969   if (!gst_structure_get_int (s, "clock-rate", &result))
970     goto no_clock_rate;
971
972   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
973
974 done:
975   GST_RTP_SESSION_UNLOCK (rtpsession);
976
977   return result;
978
979   /* ERRORS */
980 no_caps:
981   {
982     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
983     goto done;
984   }
985 no_clock_rate:
986   {
987     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
988     goto done;
989   }
990 }
991
992 /* called when the session manager asks us to reconsider the timeout */
993 static void
994 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
995 {
996   GstRtpSession *rtpsession;
997
998   rtpsession = GST_RTP_SESSION_CAST (user_data);
999
1000   GST_RTP_SESSION_LOCK (rtpsession);
1001   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1002   if (rtpsession->priv->id)
1003     gst_clock_id_unschedule (rtpsession->priv->id);
1004   GST_RTP_SESSION_UNLOCK (rtpsession);
1005 }
1006
1007 static GstFlowReturn
1008 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
1009 {
1010   GstRtpSession *rtpsession;
1011   GstRtpSessionPrivate *priv;
1012   gboolean ret = FALSE;
1013
1014   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1015   priv = rtpsession->priv;
1016
1017   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1018       GST_EVENT_TYPE_NAME (event));
1019
1020   switch (GST_EVENT_TYPE (event)) {
1021     case GST_EVENT_FLUSH_STOP:
1022       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1023       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1024       break;
1025     case GST_EVENT_NEWSEGMENT:
1026     {
1027       gboolean update;
1028       gdouble rate, arate;
1029       GstFormat format;
1030       gint64 start, stop, time;
1031       GstSegment *segment;
1032
1033       segment = &rtpsession->recv_rtp_seg;
1034
1035       /* the newsegment event is needed to convert the RTP timestamp to
1036        * running_time, which is needed to generate a mapping from RTP to NTP
1037        * timestamps in SR reports */
1038       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1039           &start, &stop, &time);
1040
1041       GST_DEBUG_OBJECT (rtpsession,
1042           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1043           "format GST_FORMAT_TIME, "
1044           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1045           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1046           update, rate, arate, GST_TIME_ARGS (segment->start),
1047           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1048           GST_TIME_ARGS (segment->accum));
1049
1050       gst_segment_set_newsegment_full (segment, update, rate,
1051           arate, format, start, stop, time);
1052
1053       /* push event forward */
1054       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1055       break;
1056     }
1057     default:
1058       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1059       break;
1060   }
1061   gst_object_unref (rtpsession);
1062
1063   return ret;
1064
1065 }
1066 static GList *
1067 gst_rtp_session_internal_links (GstPad * pad)
1068 {
1069   GstRtpSession *rtpsession;
1070   GstRtpSessionPrivate *priv;
1071   GList *res = NULL;
1072
1073   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1074   priv = rtpsession->priv;
1075
1076   if (pad == rtpsession->recv_rtp_src) {
1077     res = g_list_prepend (res, rtpsession->recv_rtp_sink);
1078   } else if (pad == rtpsession->recv_rtp_sink) {
1079     res = g_list_prepend (res, rtpsession->recv_rtp_src);
1080   } else if (pad == rtpsession->send_rtp_src) {
1081     res = g_list_prepend (res, rtpsession->send_rtp_sink);
1082   } else if (pad == rtpsession->send_rtp_sink) {
1083     res = g_list_prepend (res, rtpsession->send_rtp_src);
1084   }
1085
1086   gst_object_unref (rtpsession);
1087
1088   return res;
1089 }
1090
1091 static gboolean
1092 gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps)
1093 {
1094   GstRtpSession *rtpsession;
1095   GstRtpSessionPrivate *priv;
1096
1097   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1098   priv = rtpsession->priv;
1099
1100   GST_RTP_SESSION_LOCK (rtpsession);
1101   gst_rtp_session_cache_caps (rtpsession, caps);
1102   GST_RTP_SESSION_UNLOCK (rtpsession);
1103
1104   gst_object_unref (rtpsession);
1105
1106   return TRUE;
1107 }
1108
1109 /* receive a packet from a sender, send it to the RTP session manager and
1110  * forward the packet on the rtp_src pad
1111  */
1112 static GstFlowReturn
1113 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
1114 {
1115   GstRtpSession *rtpsession;
1116   GstRtpSessionPrivate *priv;
1117   GstFlowReturn ret;
1118   guint64 ntpnstime;
1119   GstClockTime timestamp;
1120
1121   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1122   priv = rtpsession->priv;
1123
1124   GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
1125
1126   /* get NTP time when this packet was captured, this depends on the timestamp. */
1127   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1128   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1129     /* convert to running time using the segment values */
1130     ntpnstime =
1131         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1132         timestamp);
1133     /* add constant to convert running time to NTP time */
1134     ntpnstime += priv->ntpnsbase;
1135   } else {
1136     ntpnstime = get_current_ntp_ns_time (rtpsession);
1137   }
1138
1139   ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
1140   if (ret != GST_FLOW_OK)
1141     goto push_error;
1142
1143
1144 done:
1145   gst_object_unref (rtpsession);
1146
1147   return ret;
1148
1149   /* ERRORS */
1150 push_error:
1151   {
1152     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1153         gst_flow_get_name (ret));
1154     goto done;
1155   }
1156 }
1157
1158 static GstFlowReturn
1159 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
1160 {
1161   GstRtpSession *rtpsession;
1162   GstRtpSessionPrivate *priv;
1163   gboolean ret = FALSE;
1164
1165   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1166   priv = rtpsession->priv;
1167
1168   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1169       GST_EVENT_TYPE_NAME (event));
1170
1171   switch (GST_EVENT_TYPE (event)) {
1172     default:
1173       if (rtpsession->send_rtcp_src) {
1174         gst_event_ref (event);
1175         ret = gst_pad_push_event (rtpsession->send_rtcp_src, event);
1176       }
1177       ret = gst_pad_push_event (rtpsession->sync_src, event);
1178       break;
1179   }
1180   gst_object_unref (rtpsession);
1181
1182   return ret;
1183 }
1184
1185 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1186  * forward the SR packets to the sync_src pad.
1187  */
1188 static GstFlowReturn
1189 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
1190 {
1191   GstRtpSession *rtpsession;
1192   GstRtpSessionPrivate *priv;
1193   GstFlowReturn ret;
1194
1195   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1196   priv = rtpsession->priv;
1197
1198   GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
1199
1200   ret = rtp_session_process_rtcp (priv->session, buffer);
1201
1202   gst_object_unref (rtpsession);
1203
1204   return GST_FLOW_OK;
1205 }
1206
1207 static GstFlowReturn
1208 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
1209 {
1210   GstRtpSession *rtpsession;
1211   GstRtpSessionPrivate *priv;
1212   gboolean ret = FALSE;
1213
1214   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1215   priv = rtpsession->priv;
1216
1217   GST_DEBUG_OBJECT (rtpsession, "received event");
1218
1219   switch (GST_EVENT_TYPE (event)) {
1220     case GST_EVENT_FLUSH_STOP:
1221       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
1222       break;
1223     case GST_EVENT_NEWSEGMENT:
1224     {
1225       gboolean update;
1226       gdouble rate, arate;
1227       GstFormat format;
1228       gint64 start, stop, time;
1229       GstSegment *segment;
1230
1231       segment = &rtpsession->send_rtp_seg;
1232
1233       /* the newsegment event is needed to convert the RTP timestamp to
1234        * running_time, which is needed to generate a mapping from RTP to NTP
1235        * timestamps in SR reports */
1236       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1237           &start, &stop, &time);
1238
1239       GST_DEBUG_OBJECT (rtpsession,
1240           "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1241           "format GST_FORMAT_TIME, "
1242           "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1243           ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1244           update, rate, arate, GST_TIME_ARGS (segment->start),
1245           GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1246           GST_TIME_ARGS (segment->accum));
1247
1248       gst_segment_set_newsegment_full (segment, update, rate,
1249           arate, format, start, stop, time);
1250
1251       /* push event forward */
1252       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1253       break;
1254     }
1255     case GST_EVENT_EOS:
1256       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1257       break;
1258     default:
1259       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1260       break;
1261   }
1262   gst_object_unref (rtpsession);
1263
1264   return ret;
1265 }
1266
1267 static GstCaps *
1268 gst_rtp_session_getcaps_send_rtp (GstPad * pad)
1269 {
1270   GstRtpSession *rtpsession;
1271   GstRtpSessionPrivate *priv;
1272   GstCaps *result;
1273   GstStructure *s1, *s2;
1274
1275   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1276   priv = rtpsession->priv;
1277
1278   /* we can basically accept anything but we prefer to receive packets with our
1279    * internal SSRC so that we don't have to patch it. Create a structure with
1280    * the SSRC and another one without. */
1281   s1 = gst_structure_new ("application/x-rtp",
1282       "ssrc", G_TYPE_UINT, priv->session->source->ssrc, NULL);
1283   s2 = gst_structure_new ("application/x-rtp", NULL);
1284
1285   result = gst_caps_new_full (s1, s2, NULL);
1286
1287   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
1288
1289   gst_object_unref (rtpsession);
1290
1291   return result;
1292 }
1293
1294 /* Recieve an RTP packet to be send to the receivers, send to RTP session
1295  * manager and forward to send_rtp_src.
1296  */
1297 static GstFlowReturn
1298 gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
1299 {
1300   GstRtpSession *rtpsession;
1301   GstRtpSessionPrivate *priv;
1302   GstFlowReturn ret;
1303   GstClockTime timestamp;
1304   guint64 ntpnstime;
1305
1306   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
1307   priv = rtpsession->priv;
1308
1309   GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
1310
1311   /* get NTP time when this packet was captured, this depends on the timestamp. */
1312   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1313   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1314     /* convert to running time using the segment start value. */
1315     ntpnstime =
1316         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
1317         timestamp);
1318     /* convert to NTP time by adding the NTP base */
1319     ntpnstime += priv->ntpnsbase;
1320   } else {
1321     /* no timestamp, we could take the current running_time and convert it to
1322      * NTP time. */
1323     ntpnstime = -1;
1324   }
1325
1326   ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
1327   if (ret != GST_FLOW_OK)
1328     goto push_error;
1329
1330 done:
1331   gst_object_unref (rtpsession);
1332
1333   return ret;
1334
1335   /* ERRORS */
1336 push_error:
1337   {
1338     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1339         gst_flow_get_name (ret));
1340     goto done;
1341   }
1342 }
1343
1344 /* Create sinkpad to receive RTP packets from senders. This will also create a
1345  * srcpad for the RTP packets.
1346  */
1347 static GstPad *
1348 create_recv_rtp_sink (GstRtpSession * rtpsession)
1349 {
1350   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
1351
1352   rtpsession->recv_rtp_sink =
1353       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
1354       "recv_rtp_sink");
1355   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
1356       gst_rtp_session_chain_recv_rtp);
1357   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
1358       gst_rtp_session_event_recv_rtp_sink);
1359   gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
1360       gst_rtp_session_sink_setcaps);
1361   gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink,
1362       gst_rtp_session_internal_links);
1363   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
1364   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1365       rtpsession->recv_rtp_sink);
1366
1367   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
1368   rtpsession->recv_rtp_src =
1369       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
1370       "recv_rtp_src");
1371   gst_pad_set_internal_link_function (rtpsession->recv_rtp_src,
1372       gst_rtp_session_internal_links);
1373   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
1374   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
1375   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
1376
1377   return rtpsession->recv_rtp_sink;
1378 }
1379
1380 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
1381  * sync_src pad for the SR packets.
1382  */
1383 static GstPad *
1384 create_recv_rtcp_sink (GstRtpSession * rtpsession)
1385 {
1386   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
1387
1388   rtpsession->recv_rtcp_sink =
1389       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
1390       "recv_rtcp_sink");
1391   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
1392       gst_rtp_session_chain_recv_rtcp);
1393   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
1394       gst_rtp_session_event_recv_rtcp_sink);
1395   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
1396   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1397       rtpsession->recv_rtcp_sink);
1398
1399   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
1400   rtpsession->sync_src =
1401       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
1402       "sync_src");
1403   gst_pad_use_fixed_caps (rtpsession->sync_src);
1404   gst_pad_set_active (rtpsession->sync_src, TRUE);
1405   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
1406
1407   return rtpsession->recv_rtcp_sink;
1408 }
1409
1410 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
1411  * send_rtp_src pad.
1412  */
1413 static GstPad *
1414 create_send_rtp_sink (GstRtpSession * rtpsession)
1415 {
1416   GST_DEBUG_OBJECT (rtpsession, "creating pad");
1417
1418   rtpsession->send_rtp_sink =
1419       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
1420       "send_rtp_sink");
1421   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
1422       gst_rtp_session_chain_send_rtp);
1423   gst_pad_set_getcaps_function (rtpsession->send_rtp_sink,
1424       gst_rtp_session_getcaps_send_rtp);
1425   gst_pad_set_event_function (rtpsession->send_rtp_sink,
1426       gst_rtp_session_event_send_rtp_sink);
1427   gst_pad_set_internal_link_function (rtpsession->send_rtp_sink,
1428       gst_rtp_session_internal_links);
1429   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
1430   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1431       rtpsession->send_rtp_sink);
1432
1433   rtpsession->send_rtp_src =
1434       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
1435       "send_rtp_src");
1436   gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
1437   gst_pad_set_internal_link_function (rtpsession->send_rtp_src,
1438       gst_rtp_session_internal_links);
1439   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
1440   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
1441
1442   return rtpsession->send_rtp_sink;
1443 }
1444
1445 /* Create a srcpad with the RTCP packets to send out.
1446  * This pad will be driven by the RTP session manager when it wants to send out
1447  * RTCP packets.
1448  */
1449 static GstPad *
1450 create_send_rtcp_src (GstRtpSession * rtpsession)
1451 {
1452   GST_DEBUG_OBJECT (rtpsession, "creating pad");
1453
1454   rtpsession->send_rtcp_src =
1455       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
1456       "send_rtcp_src");
1457   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
1458   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
1459   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
1460       rtpsession->send_rtcp_src);
1461
1462   return rtpsession->send_rtcp_src;
1463 }
1464
1465 static GstPad *
1466 gst_rtp_session_request_new_pad (GstElement * element,
1467     GstPadTemplate * templ, const gchar * name)
1468 {
1469   GstRtpSession *rtpsession;
1470   GstElementClass *klass;
1471   GstPad *result;
1472
1473   g_return_val_if_fail (templ != NULL, NULL);
1474   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
1475
1476   rtpsession = GST_RTP_SESSION (element);
1477   klass = GST_ELEMENT_GET_CLASS (element);
1478
1479   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1480
1481   GST_RTP_SESSION_LOCK (rtpsession);
1482
1483   /* figure out the template */
1484   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
1485     if (rtpsession->recv_rtp_sink != NULL)
1486       goto exists;
1487
1488     result = create_recv_rtp_sink (rtpsession);
1489   } else if (templ == gst_element_class_get_pad_template (klass,
1490           "recv_rtcp_sink")) {
1491     if (rtpsession->recv_rtcp_sink != NULL)
1492       goto exists;
1493
1494     result = create_recv_rtcp_sink (rtpsession);
1495   } else if (templ == gst_element_class_get_pad_template (klass,
1496           "send_rtp_sink")) {
1497     if (rtpsession->send_rtp_sink != NULL)
1498       goto exists;
1499
1500     result = create_send_rtp_sink (rtpsession);
1501   } else if (templ == gst_element_class_get_pad_template (klass,
1502           "send_rtcp_src")) {
1503     if (rtpsession->send_rtcp_src != NULL)
1504       goto exists;
1505
1506     result = create_send_rtcp_src (rtpsession);
1507   } else
1508     goto wrong_template;
1509
1510   GST_RTP_SESSION_UNLOCK (rtpsession);
1511
1512   return result;
1513
1514   /* ERRORS */
1515 wrong_template:
1516   {
1517     GST_RTP_SESSION_UNLOCK (rtpsession);
1518     g_warning ("gstrtpsession: this is not our template");
1519     return NULL;
1520   }
1521 exists:
1522   {
1523     GST_RTP_SESSION_UNLOCK (rtpsession);
1524     g_warning ("gstrtpsession: pad already requested");
1525     return NULL;
1526   }
1527 }
1528
1529 static void
1530 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
1531 {
1532 }