Fix some compiler warnings when building with G_DISABLE_ASSERT
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpsession.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpsession
22  * @see_also: rtpjitterbuffer, rtpbin, rtpptdemux, rtpssrcdemux
23  *
24  * The RTP session manager models participants with unique SSRC in an RTP
25  * session. This session can be used to send and receive RTP and RTCP packets.
26  * Based on what REQUEST pads are requested from the session manager, specific
27  * functionality can be activated.
28  *
29  * The session manager currently implements RFC 3550 including:
30  * <itemizedlist>
31  *   <listitem>
32  *     <para>RTP packet validation based on consecutive sequence numbers.</para>
33  *   </listitem>
34  *   <listitem>
35  *     <para>Maintainance of the SSRC participant database.</para>
36  *   </listitem>
37  *   <listitem>
38  *     <para>Keeping per participant statistics based on received RTCP packets.</para>
39  *   </listitem>
40  *   <listitem>
41  *     <para>Scheduling of RR/SR RTCP packets.</para>
42  *   </listitem>
43  *   <listitem>
44  *     <para>Support for multiple sender SSRC.</para>
45  *   </listitem>
46  * </itemizedlist>
47  *
48  * The rtpsession will not demux packets based on SSRC or payload type, nor will
49  * it correct for packet reordering and jitter. Use #GstRtpsSrcDemux,
50  * #GstRtpPtDemux and GstRtpJitterBuffer in addition to #GstRtpSession to
51  * perform these tasks. It is usually a good idea to use #GstRtpBin, which
52  * combines all these features in one element.
53  *
54  * To use #GstRtpSession as an RTP receiver, request a recv_rtp_sink pad, which will
55  * automatically create recv_rtp_src pad. Data received on the recv_rtp_sink pad
56  * will be processed in the session and after being validated forwarded on the
57  * recv_rtp_src pad.
58  *
59  * To also use #GstRtpSession as an RTCP receiver, request a recv_rtcp_sink pad,
60  * which will automatically create a sync_src pad. Packets received on the RTCP
61  * pad will be used by the session manager to update the stats and database of
62  * the other participants. SR packets will be forwarded on the sync_src pad
63  * so that they can be used to perform inter-stream synchronisation when needed.
64  *
65  * If you want the session manager to generate and send RTCP packets, request
66  * the send_rtcp_src pad. Packet pushed on this pad contain SR/RR RTCP reports
67  * that should be sent to all participants in the session.
68  *
69  * To use #GstRtpSession as a sender, request a send_rtp_sink pad, which will
70  * automatically create a send_rtp_src pad. The session manager will
71  * forward the packets on the send_rtp_src pad after updating its internal state.
72  *
73  * The session manager needs the clock-rate of the payload types it is handling
74  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
75  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
76  * signal.
77  *
78  * <refsect2>
79  * <title>Example pipelines</title>
80  * |[
81  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink
82  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
83  * decoder and display. Note that the application/x-rtp caps on udpsrc should be
84  * configured based on some negotiation process such as RTSP for this pipeline
85  * to work correctly.
86  * |[
87  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink rtpsession name=session \
88  *        .recv_rtp_src ! rtptheoradepay ! theoradec ! xvimagesink \
89  *     udpsrc port=5001 caps="application/x-rtcp" ! session.recv_rtcp_sink
90  * ]| Receive theora RTP packets from port 5000 and send them to the depayloader,
91  * decoder and display. Receive RTCP packets from port 5001 and process them in
92  * the session manager.
93  * Note that the application/x-rtp caps on udpsrc should be
94  * configured based on some negotiation process such as RTSP for this pipeline
95  * to work correctly.
96  * |[
97  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession .send_rtp_src ! udpsink port=5000
98  * ]| Send theora RTP packets through the session manager and out on UDP port
99  * 5000.
100  * |[
101  * gst-launch-1.0 videotestsrc ! theoraenc ! rtptheorapay ! .send_rtp_sink rtpsession name=session .send_rtp_src \
102  *     ! udpsink port=5000  session.send_rtcp_src ! udpsink port=5001
103  * ]| Send theora RTP packets through the session manager and out on UDP port
104  * 5000. Send RTCP packets on port 5001. Note that this pipeline will not preroll
105  * correctly because the second udpsink will not preroll correctly (no RTCP
106  * packets are sent in the PAUSED state). Applications should manually set and
107  * keep (see gst_element_set_locked_state()) the RTCP udpsink to the PLAYING state.
108  * </refsect2>
109  */
110
111 #ifdef HAVE_CONFIG_H
112 #include "config.h"
113 #endif
114
115 #include <gst/rtp/gstrtpbuffer.h>
116
117 #include <gst/glib-compat-private.h>
118
119 #include "gstrtpsession.h"
120 #include "rtpsession.h"
121
122 GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
123 #define GST_CAT_DEFAULT gst_rtp_session_debug
124
125 GType
126 gst_rtp_ntp_time_source_get_type (void)
127 {
128   static GType type = 0;
129   static const GEnumValue values[] = {
130     {GST_RTP_NTP_TIME_SOURCE_NTP, "NTP time based on realtime clock", "ntp"},
131     {GST_RTP_NTP_TIME_SOURCE_UNIX, "UNIX time based on realtime clock", "unix"},
132     {GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME,
133           "Running time based on pipeline clock",
134         "running-time"},
135     {GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME, "Pipeline clock time", "clock-time"},
136     {0, NULL, NULL},
137   };
138
139   if (!type) {
140     type = g_enum_register_static ("GstRtpNtpTimeSource", values);
141   }
142   return type;
143 }
144
145 /* sink pads */
146 static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
147 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
148     GST_PAD_SINK,
149     GST_PAD_REQUEST,
150     GST_STATIC_CAPS ("application/x-rtp")
151     );
152
153 static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
154 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
155     GST_PAD_SINK,
156     GST_PAD_REQUEST,
157     GST_STATIC_CAPS ("application/x-rtcp")
158     );
159
160 static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS ("application/x-rtp")
165     );
166
167 /* src pads */
168 static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
169 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
170     GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     GST_STATIC_CAPS ("application/x-rtp")
173     );
174
175 static GstStaticPadTemplate rtpsession_sync_src_template =
176 GST_STATIC_PAD_TEMPLATE ("sync_src",
177     GST_PAD_SRC,
178     GST_PAD_SOMETIMES,
179     GST_STATIC_CAPS ("application/x-rtcp")
180     );
181
182 static GstStaticPadTemplate rtpsession_send_rtp_src_template =
183 GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
184     GST_PAD_SRC,
185     GST_PAD_SOMETIMES,
186     GST_STATIC_CAPS ("application/x-rtp")
187     );
188
189 static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
190 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
191     GST_PAD_SRC,
192     GST_PAD_REQUEST,
193     GST_STATIC_CAPS ("application/x-rtcp")
194     );
195
196 /* signals and args */
197 enum
198 {
199   SIGNAL_REQUEST_PT_MAP,
200   SIGNAL_CLEAR_PT_MAP,
201
202   SIGNAL_ON_NEW_SSRC,
203   SIGNAL_ON_SSRC_COLLISION,
204   SIGNAL_ON_SSRC_VALIDATED,
205   SIGNAL_ON_SSRC_ACTIVE,
206   SIGNAL_ON_SSRC_SDES,
207   SIGNAL_ON_BYE_SSRC,
208   SIGNAL_ON_BYE_TIMEOUT,
209   SIGNAL_ON_TIMEOUT,
210   SIGNAL_ON_SENDER_TIMEOUT,
211   LAST_SIGNAL
212 };
213
214 #define DEFAULT_BANDWIDTH            0
215 #define DEFAULT_RTCP_FRACTION        RTP_STATS_RTCP_FRACTION
216 #define DEFAULT_RTCP_RR_BANDWIDTH    -1
217 #define DEFAULT_RTCP_RS_BANDWIDTH    -1
218 #define DEFAULT_SDES                 NULL
219 #define DEFAULT_NUM_SOURCES          0
220 #define DEFAULT_NUM_ACTIVE_SOURCES   0
221 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
222 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
223 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
224 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
225 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
226 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
227
228 enum
229 {
230   PROP_0,
231   PROP_BANDWIDTH,
232   PROP_RTCP_FRACTION,
233   PROP_RTCP_RR_BANDWIDTH,
234   PROP_RTCP_RS_BANDWIDTH,
235   PROP_SDES,
236   PROP_NUM_SOURCES,
237   PROP_NUM_ACTIVE_SOURCES,
238   PROP_INTERNAL_SESSION,
239   PROP_USE_PIPELINE_CLOCK,
240   PROP_RTCP_MIN_INTERVAL,
241   PROP_PROBATION,
242   PROP_STATS,
243   PROP_RTP_PROFILE,
244   PROP_NTP_TIME_SOURCE,
245   PROP_RTCP_SYNC_SEND_TIME
246 };
247
248 #define GST_RTP_SESSION_GET_PRIVATE(obj)  \
249            (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_SESSION, GstRtpSessionPrivate))
250
251 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->priv->lock)
252 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->priv->lock)
253
254 #define GST_RTP_SESSION_WAIT(sess)   g_cond_wait (&(sess)->priv->cond, &(sess)->priv->lock)
255 #define GST_RTP_SESSION_SIGNAL(sess) g_cond_signal (&(sess)->priv->cond)
256
257 struct _GstRtpSessionPrivate
258 {
259   GMutex lock;
260   GCond cond;
261   GstClock *sysclock;
262
263   RTPSession *session;
264
265   /* thread for sending out RTCP */
266   GstClockID id;
267   gboolean stop_thread;
268   GThread *thread;
269   gboolean thread_stopped;
270   gboolean wait_send;
271
272   /* caps mapping */
273   GHashTable *ptmap;
274
275   GstClockTime send_latency;
276
277   gboolean use_pipeline_clock;
278   GstRtpNtpTimeSource ntp_time_source;
279   gboolean rtcp_sync_send_time;
280
281   guint rtx_count;
282 };
283
284 /* callbacks to handle actions from the session manager */
285 static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
286     RTPSource * src, GstBuffer * buffer, gpointer user_data);
287 static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
288     RTPSource * src, gpointer data, gpointer user_data);
289 static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
290     RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data);
291 static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
292     GstBuffer * buffer, gpointer user_data);
293 static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
294     gpointer user_data);
295 static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
296 static void gst_rtp_session_request_key_unit (RTPSession * sess,
297     gboolean all_headers, gpointer user_data);
298 static GstClockTime gst_rtp_session_request_time (RTPSession * session,
299     gpointer user_data);
300 static void gst_rtp_session_notify_nack (RTPSession * sess,
301     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
302 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
303
304 static RTPSessionCallbacks callbacks = {
305   gst_rtp_session_process_rtp,
306   gst_rtp_session_send_rtp,
307   gst_rtp_session_sync_rtcp,
308   gst_rtp_session_send_rtcp,
309   gst_rtp_session_clock_rate,
310   gst_rtp_session_reconsider,
311   gst_rtp_session_request_key_unit,
312   gst_rtp_session_request_time,
313   gst_rtp_session_notify_nack,
314   gst_rtp_session_reconfigure
315 };
316
317 /* GObject vmethods */
318 static void gst_rtp_session_finalize (GObject * object);
319 static void gst_rtp_session_set_property (GObject * object, guint prop_id,
320     const GValue * value, GParamSpec * pspec);
321 static void gst_rtp_session_get_property (GObject * object, guint prop_id,
322     GValue * value, GParamSpec * pspec);
323
324 /* GstElement vmethods */
325 static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
326     GstStateChange transition);
327 static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
328     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
329 static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
330
331 static gboolean gst_rtp_session_sink_setcaps (GstPad * pad,
332     GstRtpSession * rtpsession, GstCaps * caps);
333 static gboolean gst_rtp_session_setcaps_send_rtp (GstPad * pad,
334     GstRtpSession * rtpsession, GstCaps * caps);
335
336 static void gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession);
337
338 static GstStructure *gst_rtp_session_create_stats (GstRtpSession * rtpsession);
339
340 static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
341
342 static void
343 on_new_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
344 {
345   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
346       src->ssrc);
347 }
348
349 static void
350 on_ssrc_collision (RTPSession * session, RTPSource * src, GstRtpSession * sess)
351 {
352   GstPad *send_rtp_sink;
353
354   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
355       src->ssrc);
356
357   GST_RTP_SESSION_LOCK (sess);
358   if ((send_rtp_sink = sess->send_rtp_sink))
359     gst_object_ref (send_rtp_sink);
360   GST_RTP_SESSION_UNLOCK (sess);
361
362   if (send_rtp_sink) {
363     GstStructure *structure;
364     GstEvent *event;
365     RTPSource *internal_src;
366     guint32 suggested_ssrc;
367
368     structure = gst_structure_new ("GstRTPCollision", "ssrc", G_TYPE_UINT,
369         (guint) src->ssrc, NULL);
370
371     /* if there is no source using the suggested ssrc, most probably because
372      * this ssrc has just collided, suggest upstream to use it */
373     suggested_ssrc = rtp_session_suggest_ssrc (session, NULL);
374     internal_src = rtp_session_get_source_by_ssrc (session, suggested_ssrc);
375     if (!internal_src)
376       gst_structure_set (structure, "suggested-ssrc", G_TYPE_UINT,
377           (guint) suggested_ssrc, NULL);
378     else
379       g_object_unref (internal_src);
380
381     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure);
382     gst_pad_push_event (send_rtp_sink, event);
383     gst_object_unref (send_rtp_sink);
384   }
385 }
386
387 static void
388 on_ssrc_validated (RTPSession * session, RTPSource * src, GstRtpSession * sess)
389 {
390   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
391       src->ssrc);
392 }
393
394 static void
395 on_ssrc_active (RTPSession * session, RTPSource * src, GstRtpSession * sess)
396 {
397   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
398       src->ssrc);
399 }
400
401 static void
402 on_ssrc_sdes (RTPSession * session, RTPSource * src, GstRtpSession * sess)
403 {
404   GstStructure *s;
405   GstMessage *m;
406
407   /* convert the new SDES info into a message */
408   RTP_SESSION_LOCK (session);
409   g_object_get (src, "sdes", &s, NULL);
410   RTP_SESSION_UNLOCK (session);
411
412   m = gst_message_new_custom (GST_MESSAGE_ELEMENT, GST_OBJECT (sess), s);
413   gst_element_post_message (GST_ELEMENT_CAST (sess), m);
414
415   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES], 0,
416       src->ssrc);
417 }
418
419 static void
420 on_bye_ssrc (RTPSession * session, RTPSource * src, GstRtpSession * sess)
421 {
422   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
423       src->ssrc);
424 }
425
426 static void
427 on_bye_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
428 {
429   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
430       src->ssrc);
431 }
432
433 static void
434 on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
435 {
436   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
437       src->ssrc);
438 }
439
440 static void
441 on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
442 {
443   g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
444       src->ssrc);
445 }
446
447 #define gst_rtp_session_parent_class parent_class
448 G_DEFINE_TYPE (GstRtpSession, gst_rtp_session, GST_TYPE_ELEMENT);
449
450 static void
451 gst_rtp_session_class_init (GstRtpSessionClass * klass)
452 {
453   GObjectClass *gobject_class;
454   GstElementClass *gstelement_class;
455
456   gobject_class = (GObjectClass *) klass;
457   gstelement_class = (GstElementClass *) klass;
458
459   g_type_class_add_private (klass, sizeof (GstRtpSessionPrivate));
460
461   gobject_class->finalize = gst_rtp_session_finalize;
462   gobject_class->set_property = gst_rtp_session_set_property;
463   gobject_class->get_property = gst_rtp_session_get_property;
464
465   /**
466    * GstRtpSession::request-pt-map:
467    * @sess: the object which received the signal
468    * @pt: the pt
469    *
470    * Request the payload type as #GstCaps for @pt.
471    */
472   gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
473       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
474       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, request_pt_map),
475       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 1, G_TYPE_UINT);
476   /**
477    * GstRtpSession::clear-pt-map:
478    * @sess: the object which received the signal
479    *
480    * Clear the cached pt-maps requested with #GstRtpSession::request-pt-map.
481    */
482   gst_rtp_session_signals[SIGNAL_CLEAR_PT_MAP] =
483       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
484       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSessionClass, clear_pt_map),
485       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
486
487   /**
488    * GstRtpSession::on-new-ssrc:
489    * @sess: the object which received the signal
490    * @ssrc: the SSRC
491    *
492    * Notify of a new SSRC that entered @session.
493    */
494   gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
495       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
496       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_new_ssrc),
497       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
498   /**
499    * GstRtpSession::on-ssrc_collision:
500    * @sess: the object which received the signal
501    * @ssrc: the SSRC
502    *
503    * Notify when we have an SSRC collision
504    */
505   gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
506       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
507       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
508           on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
509       G_TYPE_NONE, 1, G_TYPE_UINT);
510   /**
511    * GstRtpSession::on-ssrc_validated:
512    * @sess: the object which received the signal
513    * @ssrc: the SSRC
514    *
515    * Notify of a new SSRC that became validated.
516    */
517   gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
518       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
519       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
520           on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
521       G_TYPE_NONE, 1, G_TYPE_UINT);
522   /**
523    * GstRtpSession::on-ssrc_active:
524    * @sess: the object which received the signal
525    * @ssrc: the SSRC
526    *
527    * Notify of a SSRC that is active, i.e., sending RTCP.
528    */
529   gst_rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
530       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
531       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
532           on_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__UINT,
533       G_TYPE_NONE, 1, G_TYPE_UINT);
534   /**
535    * GstRtpSession::on-ssrc-sdes:
536    * @session: the object which received the signal
537    * @src: the SSRC
538    *
539    * Notify that a new SDES was received for SSRC.
540    */
541   gst_rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
542       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
543       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_ssrc_sdes),
544       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
545
546   /**
547    * GstRtpSession::on-bye-ssrc:
548    * @sess: the object which received the signal
549    * @ssrc: the SSRC
550    *
551    * Notify of an SSRC that became inactive because of a BYE packet.
552    */
553   gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
554       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
555       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_ssrc),
556       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
557   /**
558    * GstRtpSession::on-bye-timeout:
559    * @sess: the object which received the signal
560    * @ssrc: the SSRC
561    *
562    * Notify of an SSRC that has timed out because of BYE
563    */
564   gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
565       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
566       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_bye_timeout),
567       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
568   /**
569    * GstRtpSession::on-timeout:
570    * @sess: the object which received the signal
571    * @ssrc: the SSRC
572    *
573    * Notify of an SSRC that has timed out
574    */
575   gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
576       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
577       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
578       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
579   /**
580    * GstRtpSession::on-sender-timeout:
581    * @sess: the object which received the signal
582    * @ssrc: the SSRC
583    *
584    * Notify of a sender SSRC that has timed out and became a receiver
585    */
586   gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
587       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
588       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
589           on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
590       G_TYPE_NONE, 1, G_TYPE_UINT);
591
592   g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
593       g_param_spec_double ("bandwidth", "Bandwidth",
594           "The bandwidth of the session in bytes per second (0 for auto-discover)",
595           0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
596           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
597
598   g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
599       g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
600           "The RTCP bandwidth of the session in bytes per second "
601           "(or as a real fraction of the RTP bandwidth if < 1.0)",
602           0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
603           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604
605   g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
606       g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
607           "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
608           -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
609           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
610
611   g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
612       g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
613           "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
614           -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
615           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
616
617   g_object_class_install_property (gobject_class, PROP_SDES,
618       g_param_spec_boxed ("sdes", "SDES",
619           "The SDES items of this session",
620           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
621
622   g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
623       g_param_spec_uint ("num-sources", "Num Sources",
624           "The number of sources in the session", 0, G_MAXUINT,
625           DEFAULT_NUM_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
626
627   g_object_class_install_property (gobject_class, PROP_NUM_ACTIVE_SOURCES,
628       g_param_spec_uint ("num-active-sources", "Num Active Sources",
629           "The number of active sources in the session", 0, G_MAXUINT,
630           DEFAULT_NUM_ACTIVE_SOURCES,
631           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
632
633   g_object_class_install_property (gobject_class, PROP_INTERNAL_SESSION,
634       g_param_spec_object ("internal-session", "Internal Session",
635           "The internal RTPSession object", RTP_TYPE_SESSION,
636           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
637
638   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
639       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
640           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
641           "(DEPRECATED: Use ntp-time-source property)",
642           DEFAULT_USE_PIPELINE_CLOCK,
643           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
644
645   g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
646       g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
647           "Minimum interval between Regular RTCP packet (in ns)",
648           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
649           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
650
651   g_object_class_install_property (gobject_class, PROP_PROBATION,
652       g_param_spec_uint ("probation", "Number of probations",
653           "Consecutive packet sequence numbers to accept the source",
654           0, G_MAXUINT, DEFAULT_PROBATION,
655           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
656
657   /**
658    * GstRtpSession::stats:
659    *
660    * Various session statistics. This property returns a GstStructure
661    * with name application/x-rtp-session-stats with the following fields:
662    *
663    *  "rtx-count"       G_TYPE_UINT   The number of retransmission events
664    *      received from downstream (in receiver mode)
665    *  "rtx-drop-count"  G_TYPE_UINT   The number of retransmission events
666    *      dropped (due to bandwidth constraints)
667    *  "sent-nack-count" G_TYPE_UINT   Number of NACKs sent
668    *  "recv-nack-count" G_TYPE_UINT   Number of NACKs received
669    *
670    * Since: 1.4
671    */
672   g_object_class_install_property (gobject_class, PROP_STATS,
673       g_param_spec_boxed ("stats", "Statistics",
674           "Various statistics", GST_TYPE_STRUCTURE,
675           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
676
677   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
678       g_param_spec_enum ("rtp-profile", "RTP Profile",
679           "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
680           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
681
682   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
683       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
684           "NTP time source for RTCP packets",
685           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
686           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
687
688   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
689       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
690           "Use send time or capture time for RTCP sync "
691           "(TRUE = send time, FALSE = capture time)",
692           DEFAULT_RTCP_SYNC_SEND_TIME,
693           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
694
695   gstelement_class->change_state =
696       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
697   gstelement_class->request_new_pad =
698       GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
699   gstelement_class->release_pad =
700       GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
701
702   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_session_clear_pt_map);
703
704   /* sink pads */
705   gst_element_class_add_pad_template (gstelement_class,
706       gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
707   gst_element_class_add_pad_template (gstelement_class,
708       gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
709   gst_element_class_add_pad_template (gstelement_class,
710       gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
711
712   /* src pads */
713   gst_element_class_add_pad_template (gstelement_class,
714       gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
715   gst_element_class_add_pad_template (gstelement_class,
716       gst_static_pad_template_get (&rtpsession_sync_src_template));
717   gst_element_class_add_pad_template (gstelement_class,
718       gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
719   gst_element_class_add_pad_template (gstelement_class,
720       gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
721
722   gst_element_class_set_static_metadata (gstelement_class, "RTP Session",
723       "Filter/Network/RTP",
724       "Implement an RTP session", "Wim Taymans <wim.taymans@gmail.com>");
725
726   GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
727       "rtpsession", 0, "RTP Session");
728 }
729
730 static void
731 gst_rtp_session_init (GstRtpSession * rtpsession)
732 {
733   rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
734   g_mutex_init (&rtpsession->priv->lock);
735   g_cond_init (&rtpsession->priv->cond);
736   rtpsession->priv->sysclock = gst_system_clock_obtain ();
737   rtpsession->priv->session = rtp_session_new ();
738   rtpsession->priv->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
739   rtpsession->priv->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
740
741   /* configure callbacks */
742   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
743   /* configure signals */
744   g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
745       (GCallback) on_new_ssrc, rtpsession);
746   g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
747       (GCallback) on_ssrc_collision, rtpsession);
748   g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
749       (GCallback) on_ssrc_validated, rtpsession);
750   g_signal_connect (rtpsession->priv->session, "on-ssrc-active",
751       (GCallback) on_ssrc_active, rtpsession);
752   g_signal_connect (rtpsession->priv->session, "on-ssrc-sdes",
753       (GCallback) on_ssrc_sdes, rtpsession);
754   g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
755       (GCallback) on_bye_ssrc, rtpsession);
756   g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
757       (GCallback) on_bye_timeout, rtpsession);
758   g_signal_connect (rtpsession->priv->session, "on-timeout",
759       (GCallback) on_timeout, rtpsession);
760   g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
761       (GCallback) on_sender_timeout, rtpsession);
762   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
763       (GDestroyNotify) gst_caps_unref);
764
765   gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
766   gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
767
768   rtpsession->priv->thread_stopped = TRUE;
769
770   rtpsession->priv->rtx_count = 0;
771
772   rtpsession->priv->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
773 }
774
775 static void
776 gst_rtp_session_finalize (GObject * object)
777 {
778   GstRtpSession *rtpsession;
779
780   rtpsession = GST_RTP_SESSION (object);
781
782   g_hash_table_destroy (rtpsession->priv->ptmap);
783   g_mutex_clear (&rtpsession->priv->lock);
784   g_cond_clear (&rtpsession->priv->cond);
785   g_object_unref (rtpsession->priv->sysclock);
786   g_object_unref (rtpsession->priv->session);
787
788   G_OBJECT_CLASS (parent_class)->finalize (object);
789 }
790
791 static void
792 gst_rtp_session_set_property (GObject * object, guint prop_id,
793     const GValue * value, GParamSpec * pspec)
794 {
795   GstRtpSession *rtpsession;
796   GstRtpSessionPrivate *priv;
797
798   rtpsession = GST_RTP_SESSION (object);
799   priv = rtpsession->priv;
800
801   switch (prop_id) {
802     case PROP_BANDWIDTH:
803       g_object_set_property (G_OBJECT (priv->session), "bandwidth", value);
804       break;
805     case PROP_RTCP_FRACTION:
806       g_object_set_property (G_OBJECT (priv->session), "rtcp-fraction", value);
807       break;
808     case PROP_RTCP_RR_BANDWIDTH:
809       g_object_set_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
810           value);
811       break;
812     case PROP_RTCP_RS_BANDWIDTH:
813       g_object_set_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
814           value);
815       break;
816     case PROP_SDES:
817       rtp_session_set_sdes_struct (priv->session, g_value_get_boxed (value));
818       break;
819     case PROP_USE_PIPELINE_CLOCK:
820       priv->use_pipeline_clock = g_value_get_boolean (value);
821       break;
822     case PROP_RTCP_MIN_INTERVAL:
823       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
824           value);
825       break;
826     case PROP_PROBATION:
827       g_object_set_property (G_OBJECT (priv->session), "probation", value);
828       break;
829     case PROP_RTP_PROFILE:
830       g_object_set_property (G_OBJECT (priv->session), "rtp-profile", value);
831       break;
832     case PROP_NTP_TIME_SOURCE:
833       priv->ntp_time_source = g_value_get_enum (value);
834       break;
835     case PROP_RTCP_SYNC_SEND_TIME:
836       priv->rtcp_sync_send_time = g_value_get_boolean (value);
837       break;
838     default:
839       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
840       break;
841   }
842 }
843
844 static void
845 gst_rtp_session_get_property (GObject * object, guint prop_id,
846     GValue * value, GParamSpec * pspec)
847 {
848   GstRtpSession *rtpsession;
849   GstRtpSessionPrivate *priv;
850
851   rtpsession = GST_RTP_SESSION (object);
852   priv = rtpsession->priv;
853
854   switch (prop_id) {
855     case PROP_BANDWIDTH:
856       g_object_get_property (G_OBJECT (priv->session), "bandwidth", value);
857       break;
858     case PROP_RTCP_FRACTION:
859       g_object_get_property (G_OBJECT (priv->session), "rtcp-fraction", value);
860       break;
861     case PROP_RTCP_RR_BANDWIDTH:
862       g_object_get_property (G_OBJECT (priv->session), "rtcp-rr-bandwidth",
863           value);
864       break;
865     case PROP_RTCP_RS_BANDWIDTH:
866       g_object_get_property (G_OBJECT (priv->session), "rtcp-rs-bandwidth",
867           value);
868       break;
869     case PROP_SDES:
870       g_value_take_boxed (value, rtp_session_get_sdes_struct (priv->session));
871       break;
872     case PROP_NUM_SOURCES:
873       g_value_set_uint (value, rtp_session_get_num_sources (priv->session));
874       break;
875     case PROP_NUM_ACTIVE_SOURCES:
876       g_value_set_uint (value,
877           rtp_session_get_num_active_sources (priv->session));
878       break;
879     case PROP_INTERNAL_SESSION:
880       g_value_set_object (value, priv->session);
881       break;
882     case PROP_USE_PIPELINE_CLOCK:
883       g_value_set_boolean (value, priv->use_pipeline_clock);
884       break;
885     case PROP_RTCP_MIN_INTERVAL:
886       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
887           value);
888       break;
889     case PROP_PROBATION:
890       g_object_get_property (G_OBJECT (priv->session), "probation", value);
891       break;
892     case PROP_STATS:
893       g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
894       break;
895     case PROP_RTP_PROFILE:
896       g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
897       break;
898     case PROP_NTP_TIME_SOURCE:
899       g_value_set_enum (value, priv->ntp_time_source);
900       break;
901     case PROP_RTCP_SYNC_SEND_TIME:
902       g_value_set_boolean (value, priv->rtcp_sync_send_time);
903       break;
904     default:
905       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
906       break;
907   }
908 }
909
910 static GstStructure *
911 gst_rtp_session_create_stats (GstRtpSession * rtpsession)
912 {
913   GstStructure *s;
914
915   g_object_get (rtpsession->priv->session, "stats", &s, NULL);
916   gst_structure_set (s, "rtx-count", G_TYPE_UINT, rtpsession->priv->rtx_count,
917       NULL);
918
919   return s;
920 }
921
922 static void
923 get_current_times (GstRtpSession * rtpsession, GstClockTime * running_time,
924     guint64 * ntpnstime)
925 {
926   guint64 ntpns;
927   GstClock *clock;
928   GstClockTime base_time, rt, clock_time;
929
930   GST_OBJECT_LOCK (rtpsession);
931   if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
932     base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
933     gst_object_ref (clock);
934     GST_OBJECT_UNLOCK (rtpsession);
935
936     /* get current clock time and convert to running time */
937     clock_time = gst_clock_get_time (clock);
938     rt = clock_time - base_time;
939
940     if (rtpsession->priv->use_pipeline_clock) {
941       ntpns = rt;
942       /* add constant to convert from 1970 based time to 1900 based time */
943       ntpns += (2208988800LL * GST_SECOND);
944     } else {
945       switch (rtpsession->priv->ntp_time_source) {
946         case GST_RTP_NTP_TIME_SOURCE_NTP:
947         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
948           GTimeVal current;
949
950           /* get current NTP time */
951           g_get_current_time (&current);
952           ntpns = GST_TIMEVAL_TO_TIME (current);
953
954           /* add constant to convert from 1970 based time to 1900 based time */
955           if (rtpsession->priv->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
956             ntpns += (2208988800LL * GST_SECOND);
957           break;
958         }
959         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
960           ntpns = rt;
961           break;
962         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
963           ntpns = clock_time;
964           break;
965         default:
966           ntpns = -1;
967           g_assert_not_reached ();
968           break;
969       }
970     }
971
972     gst_object_unref (clock);
973   } else {
974     GST_OBJECT_UNLOCK (rtpsession);
975     rt = -1;
976     ntpns = -1;
977   }
978   if (running_time)
979     *running_time = rt;
980   if (ntpnstime)
981     *ntpnstime = ntpns;
982 }
983
984 static void
985 rtcp_thread (GstRtpSession * rtpsession)
986 {
987   GstClockID id;
988   GstClockTime current_time;
989   GstClockTime next_timeout;
990   guint64 ntpnstime;
991   GstClockTime running_time;
992   RTPSession *session;
993   GstClock *sysclock;
994
995   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
996
997   GST_RTP_SESSION_LOCK (rtpsession);
998
999   while (rtpsession->priv->wait_send) {
1000     GST_LOG_OBJECT (rtpsession, "waiting for getting started");
1001     GST_RTP_SESSION_WAIT (rtpsession);
1002     GST_LOG_OBJECT (rtpsession, "signaled...");
1003   }
1004
1005   sysclock = rtpsession->priv->sysclock;
1006   current_time = gst_clock_get_time (sysclock);
1007
1008   session = rtpsession->priv->session;
1009
1010   GST_DEBUG_OBJECT (rtpsession, "starting at %" GST_TIME_FORMAT,
1011       GST_TIME_ARGS (current_time));
1012   session->start_time = current_time;
1013
1014   while (!rtpsession->priv->stop_thread) {
1015     GstClockReturn res;
1016
1017     /* get initial estimate */
1018     next_timeout = rtp_session_next_timeout (session, current_time);
1019
1020     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
1021         GST_TIME_ARGS (next_timeout));
1022
1023     /* leave if no more timeouts, the session ended */
1024     if (next_timeout == GST_CLOCK_TIME_NONE)
1025       break;
1026
1027     id = rtpsession->priv->id =
1028         gst_clock_new_single_shot_id (sysclock, next_timeout);
1029     GST_RTP_SESSION_UNLOCK (rtpsession);
1030
1031     res = gst_clock_id_wait (id, NULL);
1032
1033     GST_RTP_SESSION_LOCK (rtpsession);
1034     gst_clock_id_unref (id);
1035     rtpsession->priv->id = NULL;
1036
1037     if (rtpsession->priv->stop_thread)
1038       break;
1039
1040     /* update current time */
1041     current_time = gst_clock_get_time (sysclock);
1042
1043     /* get current NTP time */
1044     get_current_times (rtpsession, &running_time, &ntpnstime);
1045
1046     /* we get unlocked because we need to perform reconsideration, don't perform
1047      * the timeout but get a new reporting estimate. */
1048     GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
1049         res, GST_TIME_ARGS (current_time));
1050
1051     /* perform actions, we ignore result. Release lock because it might push. */
1052     GST_RTP_SESSION_UNLOCK (rtpsession);
1053     rtp_session_on_timeout (session, current_time, ntpnstime, running_time);
1054     GST_RTP_SESSION_LOCK (rtpsession);
1055   }
1056   /* mark the thread as stopped now */
1057   rtpsession->priv->thread_stopped = TRUE;
1058   GST_RTP_SESSION_UNLOCK (rtpsession);
1059
1060   GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
1061 }
1062
1063 static gboolean
1064 start_rtcp_thread (GstRtpSession * rtpsession)
1065 {
1066   GError *error = NULL;
1067   gboolean res;
1068
1069   GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
1070
1071   GST_RTP_SESSION_LOCK (rtpsession);
1072   rtpsession->priv->stop_thread = FALSE;
1073   if (rtpsession->priv->thread_stopped) {
1074     /* if the thread stopped, and we still have a handle to the thread, join it
1075      * now. We can safely join with the lock held, the thread will not take it
1076      * anymore. */
1077     if (rtpsession->priv->thread)
1078       g_thread_join (rtpsession->priv->thread);
1079     /* only create a new thread if the old one was stopped. Otherwise we can
1080      * just reuse the currently running one. */
1081     rtpsession->priv->thread = g_thread_try_new ("rtpsession-rtcp-thread",
1082         (GThreadFunc) rtcp_thread, rtpsession, &error);
1083     rtpsession->priv->thread_stopped = FALSE;
1084   }
1085   GST_RTP_SESSION_UNLOCK (rtpsession);
1086
1087   if (error != NULL) {
1088     res = FALSE;
1089     GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
1090     g_error_free (error);
1091   } else {
1092     res = TRUE;
1093   }
1094   return res;
1095 }
1096
1097 static void
1098 stop_rtcp_thread (GstRtpSession * rtpsession)
1099 {
1100   GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
1101
1102   GST_RTP_SESSION_LOCK (rtpsession);
1103   rtpsession->priv->stop_thread = TRUE;
1104   rtpsession->priv->wait_send = FALSE;
1105   GST_RTP_SESSION_SIGNAL (rtpsession);
1106   if (rtpsession->priv->id)
1107     gst_clock_id_unschedule (rtpsession->priv->id);
1108   GST_RTP_SESSION_UNLOCK (rtpsession);
1109 }
1110
1111 static void
1112 join_rtcp_thread (GstRtpSession * rtpsession)
1113 {
1114   GST_RTP_SESSION_LOCK (rtpsession);
1115   /* don't try to join when we have no thread */
1116   if (rtpsession->priv->thread != NULL) {
1117     GST_DEBUG_OBJECT (rtpsession, "joining RTCP thread");
1118     GST_RTP_SESSION_UNLOCK (rtpsession);
1119
1120     g_thread_join (rtpsession->priv->thread);
1121
1122     GST_RTP_SESSION_LOCK (rtpsession);
1123     /* after the join, take the lock and clear the thread structure. The caller
1124      * is supposed to not concurrently call start and join. */
1125     rtpsession->priv->thread = NULL;
1126   }
1127   GST_RTP_SESSION_UNLOCK (rtpsession);
1128 }
1129
1130 static GstStateChangeReturn
1131 gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
1132 {
1133   GstStateChangeReturn res;
1134   GstRtpSession *rtpsession;
1135
1136   rtpsession = GST_RTP_SESSION (element);
1137
1138   switch (transition) {
1139     case GST_STATE_CHANGE_NULL_TO_READY:
1140       break;
1141     case GST_STATE_CHANGE_READY_TO_PAUSED:
1142       GST_RTP_SESSION_LOCK (rtpsession);
1143       if (rtpsession->send_rtp_src)
1144         rtpsession->priv->wait_send = TRUE;
1145       GST_RTP_SESSION_UNLOCK (rtpsession);
1146       break;
1147     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1148       break;
1149     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1150     case GST_STATE_CHANGE_PAUSED_TO_READY:
1151       /* no need to join yet, we might want to continue later. Also, the
1152        * dataflow could block downstream so that a join could just block
1153        * forever. */
1154       stop_rtcp_thread (rtpsession);
1155       break;
1156     default:
1157       break;
1158   }
1159
1160   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1161
1162   switch (transition) {
1163     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1164       if (!start_rtcp_thread (rtpsession))
1165         goto failed_thread;
1166       break;
1167     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1168       break;
1169     case GST_STATE_CHANGE_PAUSED_TO_READY:
1170       /* downstream is now releasing the dataflow and we can join. */
1171       join_rtcp_thread (rtpsession);
1172       break;
1173     case GST_STATE_CHANGE_READY_TO_NULL:
1174       break;
1175     default:
1176       break;
1177   }
1178   return res;
1179
1180   /* ERRORS */
1181 failed_thread:
1182   {
1183     return GST_STATE_CHANGE_FAILURE;
1184   }
1185 }
1186
1187 static gboolean
1188 return_true (gpointer key, gpointer value, gpointer user_data)
1189 {
1190   return TRUE;
1191 }
1192
1193 static void
1194 gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
1195 {
1196   g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
1197 }
1198
1199 /* called when the session manager has an RTP packet or a list of packets
1200  * ready for further processing */
1201 static GstFlowReturn
1202 gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
1203     GstBuffer * buffer, gpointer user_data)
1204 {
1205   GstFlowReturn result;
1206   GstRtpSession *rtpsession;
1207   GstPad *rtp_src;
1208
1209   rtpsession = GST_RTP_SESSION (user_data);
1210
1211   GST_RTP_SESSION_LOCK (rtpsession);
1212   if ((rtp_src = rtpsession->recv_rtp_src))
1213     gst_object_ref (rtp_src);
1214   GST_RTP_SESSION_UNLOCK (rtpsession);
1215
1216   if (rtp_src) {
1217     GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
1218     result = gst_pad_push (rtp_src, buffer);
1219     gst_object_unref (rtp_src);
1220   } else {
1221     GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
1222     gst_buffer_unref (buffer);
1223     result = GST_FLOW_OK;
1224   }
1225   return result;
1226 }
1227
1228 /* called when the session manager has an RTP packet ready for further
1229  * sending */
1230 static GstFlowReturn
1231 gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
1232     gpointer data, gpointer user_data)
1233 {
1234   GstFlowReturn result;
1235   GstRtpSession *rtpsession;
1236   GstPad *rtp_src;
1237
1238   rtpsession = GST_RTP_SESSION (user_data);
1239
1240   GST_RTP_SESSION_LOCK (rtpsession);
1241   if ((rtp_src = rtpsession->send_rtp_src))
1242     gst_object_ref (rtp_src);
1243   if (rtpsession->priv->wait_send) {
1244     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1245     rtpsession->priv->wait_send = FALSE;
1246     GST_RTP_SESSION_SIGNAL (rtpsession);
1247   }
1248   GST_RTP_SESSION_UNLOCK (rtpsession);
1249
1250   if (rtp_src) {
1251     if (GST_IS_BUFFER (data)) {
1252       GST_LOG_OBJECT (rtpsession, "sending RTP packet");
1253       result = gst_pad_push (rtp_src, GST_BUFFER_CAST (data));
1254     } else {
1255       GST_LOG_OBJECT (rtpsession, "sending RTP list");
1256       result = gst_pad_push_list (rtp_src, GST_BUFFER_LIST_CAST (data));
1257     }
1258     gst_object_unref (rtp_src);
1259   } else {
1260     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1261     result = GST_FLOW_OK;
1262   }
1263   return result;
1264 }
1265
1266 static void
1267 do_rtcp_events (GstRtpSession * rtpsession, GstPad * srcpad)
1268 {
1269   GstCaps *caps;
1270   GstSegment seg;
1271   GstEvent *event;
1272   gchar *stream_id;
1273   gboolean have_group_id;
1274   guint group_id;
1275
1276   stream_id =
1277       g_strdup_printf ("%08x%08x%08x%08x", g_random_int (), g_random_int (),
1278       g_random_int (), g_random_int ());
1279
1280   GST_RTP_SESSION_LOCK (rtpsession);
1281   if (rtpsession->recv_rtp_sink) {
1282     event =
1283         gst_pad_get_sticky_event (rtpsession->recv_rtp_sink,
1284         GST_EVENT_STREAM_START, 0);
1285     if (event) {
1286       if (gst_event_parse_group_id (event, &group_id))
1287         have_group_id = TRUE;
1288       else
1289         have_group_id = FALSE;
1290       gst_event_unref (event);
1291     } else {
1292       have_group_id = TRUE;
1293       group_id = gst_util_group_id_next ();
1294     }
1295   } else {
1296     have_group_id = TRUE;
1297     group_id = gst_util_group_id_next ();
1298   }
1299   GST_RTP_SESSION_UNLOCK (rtpsession);
1300
1301   event = gst_event_new_stream_start (stream_id);
1302   if (have_group_id)
1303     gst_event_set_group_id (event, group_id);
1304   gst_pad_push_event (srcpad, event);
1305   g_free (stream_id);
1306
1307   caps = gst_caps_new_empty_simple ("application/x-rtcp");
1308   gst_pad_set_caps (srcpad, caps);
1309   gst_caps_unref (caps);
1310
1311   gst_segment_init (&seg, GST_FORMAT_TIME);
1312   event = gst_event_new_segment (&seg);
1313   gst_pad_push_event (srcpad, event);
1314 }
1315
1316 /* called when the session manager has an RTCP packet ready for further
1317  * sending. The eos flag is set when an EOS event should be sent downstream as
1318  * well. */
1319 static GstFlowReturn
1320 gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
1321     GstBuffer * buffer, gboolean eos, gpointer user_data)
1322 {
1323   GstFlowReturn result;
1324   GstRtpSession *rtpsession;
1325   GstPad *rtcp_src;
1326
1327   rtpsession = GST_RTP_SESSION (user_data);
1328
1329   GST_RTP_SESSION_LOCK (rtpsession);
1330   if (rtpsession->priv->stop_thread)
1331     goto stopping;
1332
1333   if ((rtcp_src = rtpsession->send_rtcp_src)) {
1334     gst_object_ref (rtcp_src);
1335     GST_RTP_SESSION_UNLOCK (rtpsession);
1336
1337     /* set rtcp caps on output pad */
1338     if (!gst_pad_has_current_caps (rtcp_src))
1339       do_rtcp_events (rtpsession, rtcp_src);
1340
1341     GST_LOG_OBJECT (rtpsession, "sending RTCP");
1342     result = gst_pad_push (rtcp_src, buffer);
1343
1344     /* we have to send EOS after this packet */
1345     if (eos) {
1346       GST_LOG_OBJECT (rtpsession, "sending EOS");
1347       gst_pad_push_event (rtcp_src, gst_event_new_eos ());
1348     }
1349     gst_object_unref (rtcp_src);
1350   } else {
1351     GST_RTP_SESSION_UNLOCK (rtpsession);
1352
1353     GST_DEBUG_OBJECT (rtpsession, "not sending RTCP, no output pad");
1354     gst_buffer_unref (buffer);
1355     result = GST_FLOW_OK;
1356   }
1357   return result;
1358
1359   /* ERRORS */
1360 stopping:
1361   {
1362     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1363     gst_buffer_unref (buffer);
1364     GST_RTP_SESSION_UNLOCK (rtpsession);
1365     return GST_FLOW_OK;
1366   }
1367 }
1368
1369 /* called when the session manager has an SR RTCP packet ready for handling
1370  * inter stream synchronisation */
1371 static GstFlowReturn
1372 gst_rtp_session_sync_rtcp (RTPSession * sess,
1373     GstBuffer * buffer, gpointer user_data)
1374 {
1375   GstFlowReturn result;
1376   GstRtpSession *rtpsession;
1377   GstPad *sync_src;
1378
1379   rtpsession = GST_RTP_SESSION (user_data);
1380
1381   GST_RTP_SESSION_LOCK (rtpsession);
1382   if (rtpsession->priv->stop_thread)
1383     goto stopping;
1384
1385   if ((sync_src = rtpsession->sync_src)) {
1386     gst_object_ref (sync_src);
1387     GST_RTP_SESSION_UNLOCK (rtpsession);
1388
1389     /* set rtcp caps on output pad, this happens
1390      * when we receive RTCP muxed with RTP according
1391      * to RFC5761. Otherwise we would have forwarded
1392      * the events from the recv_rtcp_sink pad already
1393      */
1394     if (!gst_pad_has_current_caps (sync_src))
1395       do_rtcp_events (rtpsession, sync_src);
1396
1397     GST_LOG_OBJECT (rtpsession, "sending Sync RTCP");
1398     result = gst_pad_push (sync_src, buffer);
1399     gst_object_unref (sync_src);
1400   } else {
1401     GST_RTP_SESSION_UNLOCK (rtpsession);
1402
1403     GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
1404     gst_buffer_unref (buffer);
1405     result = GST_FLOW_OK;
1406   }
1407   return result;
1408
1409   /* ERRORS */
1410 stopping:
1411   {
1412     GST_DEBUG_OBJECT (rtpsession, "we are stopping");
1413     gst_buffer_unref (buffer);
1414     GST_RTP_SESSION_UNLOCK (rtpsession);
1415     return GST_FLOW_OK;
1416   }
1417 }
1418
1419 static void
1420 gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
1421 {
1422   GstRtpSessionPrivate *priv;
1423   const GstStructure *s;
1424   gint payload;
1425
1426   priv = rtpsession->priv;
1427
1428   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
1429
1430   s = gst_caps_get_structure (caps, 0);
1431   if (!gst_structure_get_int (s, "payload", &payload))
1432     return;
1433
1434   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
1435     return;
1436
1437   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
1438       gst_caps_ref (caps));
1439 }
1440
1441 static GstCaps *
1442 gst_rtp_session_get_caps_for_pt (GstRtpSession * rtpsession, guint payload)
1443 {
1444   GstCaps *caps = NULL;
1445   GValue args[2] = { {0}, {0} };
1446   GValue ret = { 0 };
1447
1448   GST_RTP_SESSION_LOCK (rtpsession);
1449   caps = g_hash_table_lookup (rtpsession->priv->ptmap,
1450       GINT_TO_POINTER (payload));
1451   if (caps) {
1452     gst_caps_ref (caps);
1453     goto done;
1454   }
1455
1456   /* not found in the cache, try to get it with a signal */
1457   g_value_init (&args[0], GST_TYPE_ELEMENT);
1458   g_value_set_object (&args[0], rtpsession);
1459   g_value_init (&args[1], G_TYPE_UINT);
1460   g_value_set_uint (&args[1], payload);
1461
1462   g_value_init (&ret, GST_TYPE_CAPS);
1463   g_value_set_boxed (&ret, NULL);
1464
1465   GST_RTP_SESSION_UNLOCK (rtpsession);
1466
1467   g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
1468       &ret);
1469
1470   GST_RTP_SESSION_LOCK (rtpsession);
1471
1472   g_value_unset (&args[0]);
1473   g_value_unset (&args[1]);
1474   caps = (GstCaps *) g_value_dup_boxed (&ret);
1475   g_value_unset (&ret);
1476   if (!caps)
1477     goto no_caps;
1478
1479   gst_rtp_session_cache_caps (rtpsession, caps);
1480
1481 done:
1482   GST_RTP_SESSION_UNLOCK (rtpsession);
1483
1484   return caps;
1485
1486 no_caps:
1487   {
1488     GST_DEBUG_OBJECT (rtpsession, "could not get caps");
1489     goto done;
1490   }
1491 }
1492
1493 /* called when the session manager needs the clock rate */
1494 static gint
1495 gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
1496     gpointer user_data)
1497 {
1498   gint result = -1;
1499   GstRtpSession *rtpsession;
1500   GstCaps *caps;
1501   const GstStructure *s;
1502
1503   rtpsession = GST_RTP_SESSION_CAST (user_data);
1504
1505   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1506
1507   if (!caps)
1508     goto done;
1509
1510   s = gst_caps_get_structure (caps, 0);
1511   if (!gst_structure_get_int (s, "clock-rate", &result))
1512     goto no_clock_rate;
1513
1514   gst_caps_unref (caps);
1515
1516   GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
1517
1518 done:
1519
1520   return result;
1521
1522   /* ERRORS */
1523 no_clock_rate:
1524   {
1525     gst_caps_unref (caps);
1526     GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
1527     goto done;
1528   }
1529 }
1530
1531 /* called when the session manager asks us to reconsider the timeout */
1532 static void
1533 gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
1534 {
1535   GstRtpSession *rtpsession;
1536
1537   rtpsession = GST_RTP_SESSION_CAST (user_data);
1538
1539   GST_RTP_SESSION_LOCK (rtpsession);
1540   GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
1541   if (rtpsession->priv->id)
1542     gst_clock_id_unschedule (rtpsession->priv->id);
1543   GST_RTP_SESSION_UNLOCK (rtpsession);
1544 }
1545
1546 static gboolean
1547 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstObject * parent,
1548     GstEvent * event)
1549 {
1550   GstRtpSession *rtpsession;
1551   gboolean ret = FALSE;
1552
1553   rtpsession = GST_RTP_SESSION (parent);
1554
1555   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1556       GST_EVENT_TYPE_NAME (event));
1557
1558   switch (GST_EVENT_TYPE (event)) {
1559     case GST_EVENT_CAPS:
1560     {
1561       GstCaps *caps;
1562
1563       /* process */
1564       gst_event_parse_caps (event, &caps);
1565       gst_rtp_session_sink_setcaps (pad, rtpsession, caps);
1566       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1567       break;
1568     }
1569     case GST_EVENT_FLUSH_STOP:
1570       gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
1571       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1572       break;
1573     case GST_EVENT_SEGMENT:
1574     {
1575       GstSegment *segment, in_segment;
1576
1577       segment = &rtpsession->recv_rtp_seg;
1578
1579       /* the newsegment event is needed to convert the RTP timestamp to
1580        * running_time, which is needed to generate a mapping from RTP to NTP
1581        * timestamps in SR reports */
1582       gst_event_copy_segment (event, &in_segment);
1583       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
1584           &in_segment);
1585
1586       /* accept upstream */
1587       gst_segment_copy_into (&in_segment, segment);
1588
1589       /* push event forward */
1590       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1591       break;
1592     }
1593     case GST_EVENT_EOS:
1594     {
1595       GstPad *rtcp_src;
1596
1597       ret =
1598           gst_pad_push_event (rtpsession->recv_rtp_src, gst_event_ref (event));
1599
1600       GST_RTP_SESSION_LOCK (rtpsession);
1601       if ((rtcp_src = rtpsession->send_rtcp_src))
1602         gst_object_ref (rtcp_src);
1603       GST_RTP_SESSION_UNLOCK (rtpsession);
1604
1605       if (rtcp_src) {
1606         ret = gst_pad_push_event (rtcp_src, event);
1607         gst_object_unref (rtcp_src);
1608       } else {
1609         gst_event_unref (event);
1610         ret = TRUE;
1611       }
1612       break;
1613     }
1614     default:
1615       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
1616       break;
1617   }
1618
1619   return ret;
1620
1621 }
1622
1623 static gboolean
1624 gst_rtp_session_request_remote_key_unit (GstRtpSession * rtpsession,
1625     guint32 ssrc, guint payload, gboolean all_headers, gint count)
1626 {
1627   GstCaps *caps;
1628
1629   caps = gst_rtp_session_get_caps_for_pt (rtpsession, payload);
1630
1631   if (caps) {
1632     const GstStructure *s = gst_caps_get_structure (caps, 0);
1633     gboolean pli;
1634     gboolean fir;
1635
1636     pli = gst_structure_has_field (s, "rtcp-fb-nack-pli");
1637     fir = gst_structure_has_field (s, "rtcp-fb-ccm-fir") && all_headers;
1638
1639     /* Google Talk uses FIR for repair, so send it even if we just want a
1640      * regular PLI */
1641     if (!pli &&
1642         gst_structure_has_field (s, "rtcp-fb-x-gstreamer-fir-as-repair"))
1643       fir = TRUE;
1644
1645     gst_caps_unref (caps);
1646
1647     if (pli || fir)
1648       return rtp_session_request_key_unit (rtpsession->priv->session, ssrc,
1649           fir, count);
1650   }
1651
1652   return FALSE;
1653 }
1654
1655 static gboolean
1656 gst_rtp_session_event_recv_rtp_src (GstPad * pad, GstObject * parent,
1657     GstEvent * event)
1658 {
1659   GstRtpSession *rtpsession;
1660   gboolean forward = TRUE;
1661   gboolean ret = TRUE;
1662   const GstStructure *s;
1663   guint32 ssrc;
1664   guint pt;
1665
1666   rtpsession = GST_RTP_SESSION (parent);
1667
1668   switch (GST_EVENT_TYPE (event)) {
1669     case GST_EVENT_CUSTOM_UPSTREAM:
1670       s = gst_event_get_structure (event);
1671       if (gst_structure_has_name (s, "GstForceKeyUnit") &&
1672           gst_structure_get_uint (s, "ssrc", &ssrc) &&
1673           gst_structure_get_uint (s, "payload", &pt)) {
1674         gboolean all_headers = FALSE;
1675         gint count = -1;
1676
1677         gst_structure_get_boolean (s, "all-headers", &all_headers);
1678         if (gst_structure_get_int (s, "count", &count) && count < 0)
1679           count += G_MAXINT;    /* Make sure count is positive if present */
1680         if (gst_rtp_session_request_remote_key_unit (rtpsession, ssrc, pt,
1681                 all_headers, count))
1682           forward = FALSE;
1683       } else if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
1684         GstClockTime running_time;
1685         guint seqnum, delay, deadline, max_delay, avg_rtt;
1686
1687         GST_RTP_SESSION_LOCK (rtpsession);
1688         rtpsession->priv->rtx_count++;
1689         GST_RTP_SESSION_UNLOCK (rtpsession);
1690
1691         if (!gst_structure_get_clock_time (s, "running-time", &running_time))
1692           running_time = -1;
1693         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
1694           ssrc = -1;
1695         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
1696           seqnum = -1;
1697         if (!gst_structure_get_uint (s, "delay", &delay))
1698           delay = 0;
1699         if (!gst_structure_get_uint (s, "deadline", &deadline))
1700           deadline = 100;
1701         if (!gst_structure_get_uint (s, "avg-rtt", &avg_rtt))
1702           avg_rtt = 40;
1703
1704         /* remaining time to receive the packet */
1705         max_delay = deadline;
1706         if (max_delay > delay)
1707           max_delay -= delay;
1708         /* estimated RTT */
1709         if (max_delay > avg_rtt)
1710           max_delay -= avg_rtt;
1711         else
1712           max_delay = 0;
1713
1714         if (rtp_session_request_nack (rtpsession->priv->session, ssrc, seqnum,
1715                 max_delay * GST_MSECOND))
1716           forward = FALSE;
1717       }
1718       break;
1719     default:
1720       break;
1721   }
1722
1723   if (forward) {
1724     GstPad *recv_rtp_sink;
1725
1726     GST_RTP_SESSION_LOCK (rtpsession);
1727     if ((recv_rtp_sink = rtpsession->recv_rtp_sink))
1728       gst_object_ref (recv_rtp_sink);
1729     GST_RTP_SESSION_UNLOCK (rtpsession);
1730
1731     if (recv_rtp_sink) {
1732       ret = gst_pad_push_event (recv_rtp_sink, event);
1733       gst_object_unref (recv_rtp_sink);
1734     } else
1735       gst_event_unref (event);
1736   } else {
1737     gst_event_unref (event);
1738   }
1739
1740   return ret;
1741 }
1742
1743
1744 static GstIterator *
1745 gst_rtp_session_iterate_internal_links (GstPad * pad, GstObject * parent)
1746 {
1747   GstRtpSession *rtpsession;
1748   GstPad *otherpad = NULL;
1749   GstIterator *it = NULL;
1750
1751   rtpsession = GST_RTP_SESSION (parent);
1752
1753   GST_RTP_SESSION_LOCK (rtpsession);
1754   if (pad == rtpsession->recv_rtp_src) {
1755     otherpad = gst_object_ref (rtpsession->recv_rtp_sink);
1756   } else if (pad == rtpsession->recv_rtp_sink) {
1757     otherpad = gst_object_ref (rtpsession->recv_rtp_src);
1758   } else if (pad == rtpsession->send_rtp_src) {
1759     otherpad = gst_object_ref (rtpsession->send_rtp_sink);
1760   } else if (pad == rtpsession->send_rtp_sink) {
1761     otherpad = gst_object_ref (rtpsession->send_rtp_src);
1762   }
1763   GST_RTP_SESSION_UNLOCK (rtpsession);
1764
1765   if (otherpad) {
1766     GValue val = { 0, };
1767
1768     g_value_init (&val, GST_TYPE_PAD);
1769     g_value_set_object (&val, otherpad);
1770     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1771     g_value_unset (&val);
1772     gst_object_unref (otherpad);
1773   } else {
1774     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1775   }
1776
1777   return it;
1778 }
1779
1780 static gboolean
1781 gst_rtp_session_sink_setcaps (GstPad * pad, GstRtpSession * rtpsession,
1782     GstCaps * caps)
1783 {
1784   GST_RTP_SESSION_LOCK (rtpsession);
1785   gst_rtp_session_cache_caps (rtpsession, caps);
1786   GST_RTP_SESSION_UNLOCK (rtpsession);
1787
1788   return TRUE;
1789 }
1790
1791 /* receive a packet from a sender, send it to the RTP session manager and
1792  * forward the packet on the rtp_src pad
1793  */
1794 static GstFlowReturn
1795 gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent,
1796     GstBuffer * buffer)
1797 {
1798   GstRtpSession *rtpsession;
1799   GstRtpSessionPrivate *priv;
1800   GstFlowReturn ret;
1801   GstClockTime current_time, running_time;
1802   GstClockTime timestamp;
1803   guint64 ntpnstime;
1804
1805   rtpsession = GST_RTP_SESSION (parent);
1806   priv = rtpsession->priv;
1807
1808   GST_LOG_OBJECT (rtpsession, "received RTP packet");
1809
1810   GST_RTP_SESSION_LOCK (rtpsession);
1811   if (rtpsession->priv->wait_send) {
1812     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1813     rtpsession->priv->wait_send = FALSE;
1814     GST_RTP_SESSION_SIGNAL (rtpsession);
1815   }
1816   GST_RTP_SESSION_UNLOCK (rtpsession);
1817
1818   /* get NTP time when this packet was captured, this depends on the timestamp. */
1819   timestamp = GST_BUFFER_PTS (buffer);
1820   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1821     /* convert to running time using the segment values */
1822     running_time =
1823         gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
1824         timestamp);
1825     ntpnstime = GST_CLOCK_TIME_NONE;
1826   } else {
1827     get_current_times (rtpsession, &running_time, &ntpnstime);
1828   }
1829   current_time = gst_clock_get_time (priv->sysclock);
1830
1831   ret = rtp_session_process_rtp (priv->session, buffer, current_time,
1832       running_time, ntpnstime);
1833   if (ret != GST_FLOW_OK)
1834     goto push_error;
1835
1836 done:
1837
1838   return ret;
1839
1840   /* ERRORS */
1841 push_error:
1842   {
1843     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
1844         gst_flow_get_name (ret));
1845     goto done;
1846   }
1847 }
1848
1849 static gboolean
1850 gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
1851     GstEvent * event)
1852 {
1853   GstRtpSession *rtpsession;
1854   gboolean ret = FALSE;
1855
1856   rtpsession = GST_RTP_SESSION (parent);
1857
1858   GST_DEBUG_OBJECT (rtpsession, "received event %s",
1859       GST_EVENT_TYPE_NAME (event));
1860
1861   switch (GST_EVENT_TYPE (event)) {
1862     case GST_EVENT_SEGMENT:
1863       /* Make sure that the sync_src pad has caps before the segment event.
1864        * Otherwise we might get a segment event before caps from the receive
1865        * RTCP pad, and then later when receiving RTCP packets will set caps.
1866        * This will results in a sticky event misordering warning
1867        */
1868       if (!gst_pad_has_current_caps (rtpsession->sync_src)) {
1869         GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtcp");
1870         gst_pad_set_caps (rtpsession->sync_src, caps);
1871         gst_caps_unref (caps);
1872       }
1873       /* fall through */
1874     default:
1875       ret = gst_pad_push_event (rtpsession->sync_src, event);
1876       break;
1877   }
1878
1879   return ret;
1880 }
1881
1882 /* Receive an RTCP packet from a sender, send it to the RTP session manager and
1883  * forward the SR packets to the sync_src pad.
1884  */
1885 static GstFlowReturn
1886 gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent,
1887     GstBuffer * buffer)
1888 {
1889   GstRtpSession *rtpsession;
1890   GstRtpSessionPrivate *priv;
1891   GstClockTime current_time;
1892   guint64 ntpnstime;
1893
1894   rtpsession = GST_RTP_SESSION (parent);
1895   priv = rtpsession->priv;
1896
1897   GST_LOG_OBJECT (rtpsession, "received RTCP packet");
1898
1899   GST_RTP_SESSION_LOCK (rtpsession);
1900   if (rtpsession->priv->wait_send) {
1901     GST_LOG_OBJECT (rtpsession, "signal RTCP thread");
1902     rtpsession->priv->wait_send = FALSE;
1903     GST_RTP_SESSION_SIGNAL (rtpsession);
1904   }
1905   GST_RTP_SESSION_UNLOCK (rtpsession);
1906
1907   current_time = gst_clock_get_time (priv->sysclock);
1908   get_current_times (rtpsession, NULL, &ntpnstime);
1909
1910   rtp_session_process_rtcp (priv->session, buffer, current_time, ntpnstime);
1911
1912   return GST_FLOW_OK;           /* always return OK */
1913 }
1914
1915 static gboolean
1916 gst_rtp_session_query_send_rtcp_src (GstPad * pad, GstObject * parent,
1917     GstQuery * query)
1918 {
1919   GstRtpSession *rtpsession;
1920   gboolean ret = FALSE;
1921
1922   rtpsession = GST_RTP_SESSION (parent);
1923
1924   GST_DEBUG_OBJECT (rtpsession, "received QUERY %s",
1925       GST_QUERY_TYPE_NAME (query));
1926
1927   switch (GST_QUERY_TYPE (query)) {
1928     case GST_QUERY_LATENCY:
1929       ret = TRUE;
1930       /* use the defaults for the latency query. */
1931       gst_query_set_latency (query, FALSE, 0, -1);
1932       break;
1933     default:
1934       /* other queries simply fail for now */
1935       break;
1936   }
1937
1938   return ret;
1939 }
1940
1941 static gboolean
1942 gst_rtp_session_event_send_rtcp_src (GstPad * pad, GstObject * parent,
1943     GstEvent * event)
1944 {
1945   GstRtpSession *rtpsession;
1946   gboolean ret = TRUE;
1947
1948   rtpsession = GST_RTP_SESSION (parent);
1949   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
1950       GST_EVENT_TYPE_NAME (event));
1951
1952   switch (GST_EVENT_TYPE (event)) {
1953     case GST_EVENT_SEEK:
1954     case GST_EVENT_LATENCY:
1955       gst_event_unref (event);
1956       ret = TRUE;
1957       break;
1958     default:
1959       /* other events simply fail for now */
1960       gst_event_unref (event);
1961       ret = FALSE;
1962       break;
1963   }
1964
1965   return ret;
1966 }
1967
1968
1969 static gboolean
1970 gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstObject * parent,
1971     GstEvent * event)
1972 {
1973   GstRtpSession *rtpsession;
1974   gboolean ret = FALSE;
1975
1976   rtpsession = GST_RTP_SESSION (parent);
1977
1978   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
1979       GST_EVENT_TYPE_NAME (event));
1980
1981   switch (GST_EVENT_TYPE (event)) {
1982     case GST_EVENT_CAPS:
1983     {
1984       GstCaps *caps;
1985
1986       /* process */
1987       gst_event_parse_caps (event, &caps);
1988       gst_rtp_session_setcaps_send_rtp (pad, rtpsession, caps);
1989       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1990       break;
1991     }
1992     case GST_EVENT_FLUSH_STOP:
1993       gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
1994       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
1995       break;
1996     case GST_EVENT_SEGMENT:{
1997       GstSegment *segment, in_segment;
1998
1999       segment = &rtpsession->send_rtp_seg;
2000
2001       /* the newsegment event is needed to convert the RTP timestamp to
2002        * running_time, which is needed to generate a mapping from RTP to NTP
2003        * timestamps in SR reports */
2004       gst_event_copy_segment (event, &in_segment);
2005       GST_DEBUG_OBJECT (rtpsession, "received segment %" GST_SEGMENT_FORMAT,
2006           &in_segment);
2007
2008       /* accept upstream */
2009       gst_segment_copy_into (&in_segment, segment);
2010
2011       /* push event forward */
2012       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2013       break;
2014     }
2015     case GST_EVENT_EOS:{
2016       GstClockTime current_time;
2017
2018       /* push downstream FIXME, we are not supposed to leave the session just
2019        * because we stop sending. */
2020       ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
2021       current_time = gst_clock_get_time (rtpsession->priv->sysclock);
2022
2023       GST_DEBUG_OBJECT (rtpsession, "scheduling BYE message");
2024       rtp_session_mark_all_bye (rtpsession->priv->session, "End Of Stream");
2025       rtp_session_schedule_bye (rtpsession->priv->session, current_time);
2026       break;
2027     }
2028     default:{
2029       GstPad *send_rtp_src;
2030
2031       GST_RTP_SESSION_LOCK (rtpsession);
2032       if ((send_rtp_src = rtpsession->send_rtp_src))
2033         gst_object_ref (send_rtp_src);
2034       GST_RTP_SESSION_UNLOCK (rtpsession);
2035
2036       if (send_rtp_src) {
2037         ret = gst_pad_push_event (send_rtp_src, event);
2038         gst_object_unref (send_rtp_src);
2039       } else
2040         gst_event_unref (event);
2041
2042       break;
2043     }
2044   }
2045
2046   return ret;
2047 }
2048
2049 static gboolean
2050 gst_rtp_session_event_send_rtp_src (GstPad * pad, GstObject * parent,
2051     GstEvent * event)
2052 {
2053   GstRtpSession *rtpsession;
2054   gboolean ret = FALSE;
2055
2056   rtpsession = GST_RTP_SESSION (parent);
2057
2058   GST_DEBUG_OBJECT (rtpsession, "received EVENT %s",
2059       GST_EVENT_TYPE_NAME (event));
2060
2061   switch (GST_EVENT_TYPE (event)) {
2062     case GST_EVENT_LATENCY:
2063       /* save the latency, we need this to know when an RTP packet will be
2064        * rendered by the sink */
2065       gst_event_parse_latency (event, &rtpsession->priv->send_latency);
2066
2067       ret = gst_pad_event_default (pad, parent, event);
2068       break;
2069     default:
2070       ret = gst_pad_event_default (pad, parent, event);
2071       break;
2072   }
2073   return ret;
2074 }
2075
2076 static GstCaps *
2077 gst_rtp_session_getcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2078     GstCaps * filter)
2079 {
2080   GstRtpSessionPrivate *priv;
2081   GstCaps *result;
2082   GstStructure *s1, *s2;
2083   guint ssrc;
2084   gboolean is_random;
2085
2086   priv = rtpsession->priv;
2087
2088   ssrc = rtp_session_suggest_ssrc (priv->session, &is_random);
2089
2090   /* we can basically accept anything but we prefer to receive packets with our
2091    * internal SSRC so that we don't have to patch it. Create a structure with
2092    * the SSRC and another one without.
2093    * Only do this if the session actually decided on an ssrc already,
2094    * otherwise we give upstream the opportunity to select an ssrc itself */
2095   if (!is_random) {
2096     s1 = gst_structure_new ("application/x-rtp", "ssrc", G_TYPE_UINT, ssrc,
2097         NULL);
2098     s2 = gst_structure_new_empty ("application/x-rtp");
2099
2100     result = gst_caps_new_full (s1, s2, NULL);
2101   } else {
2102     result = gst_caps_new_empty_simple ("application/x-rtp");
2103   }
2104
2105   if (filter) {
2106     GstCaps *caps = result;
2107
2108     result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
2109     gst_caps_unref (caps);
2110   }
2111
2112   GST_DEBUG_OBJECT (rtpsession, "getting caps %" GST_PTR_FORMAT, result);
2113
2114   return result;
2115 }
2116
2117 static gboolean
2118 gst_rtp_session_query_send_rtp (GstPad * pad, GstObject * parent,
2119     GstQuery * query)
2120 {
2121   gboolean res = FALSE;
2122   GstRtpSession *rtpsession;
2123
2124   rtpsession = GST_RTP_SESSION (parent);
2125
2126   switch (GST_QUERY_TYPE (query)) {
2127     case GST_QUERY_CAPS:
2128     {
2129       GstCaps *filter, *caps;
2130
2131       gst_query_parse_caps (query, &filter);
2132       caps = gst_rtp_session_getcaps_send_rtp (pad, rtpsession, filter);
2133       gst_query_set_caps_result (query, caps);
2134       gst_caps_unref (caps);
2135       res = TRUE;
2136       break;
2137     }
2138     default:
2139       res = gst_pad_query_default (pad, parent, query);
2140       break;
2141   }
2142
2143   return res;
2144 }
2145
2146 static gboolean
2147 gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstRtpSession * rtpsession,
2148     GstCaps * caps)
2149 {
2150   GstRtpSessionPrivate *priv;
2151
2152   priv = rtpsession->priv;
2153
2154   rtp_session_update_send_caps (priv->session, caps);
2155
2156   return TRUE;
2157 }
2158
2159 /* Recieve an RTP packet or a list of packets to be send to the receivers,
2160  * send to RTP session manager and forward to send_rtp_src.
2161  */
2162 static GstFlowReturn
2163 gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
2164     gpointer data, gboolean is_list)
2165 {
2166   GstRtpSessionPrivate *priv;
2167   GstFlowReturn ret;
2168   GstClockTime timestamp, running_time;
2169   GstClockTime current_time;
2170
2171   priv = rtpsession->priv;
2172
2173   GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet");
2174
2175   /* get NTP time when this packet was captured, this depends on the timestamp. */
2176   if (is_list) {
2177     GstBuffer *buffer = NULL;
2178
2179     /* All groups in an list have the same timestamp.
2180      * So, just take it from the first group. */
2181     buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0);
2182     if (buffer)
2183       timestamp = GST_BUFFER_PTS (buffer);
2184     else
2185       timestamp = -1;
2186   } else {
2187     timestamp = GST_BUFFER_PTS (GST_BUFFER_CAST (data));
2188   }
2189
2190   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2191     /* convert to running time using the segment start value. */
2192     running_time =
2193         gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
2194         timestamp);
2195     if (priv->rtcp_sync_send_time)
2196       running_time += priv->send_latency;
2197   } else {
2198     /* no timestamp. */
2199     running_time = -1;
2200   }
2201
2202   current_time = gst_clock_get_time (priv->sysclock);
2203   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
2204       running_time);
2205   if (ret != GST_FLOW_OK)
2206     goto push_error;
2207
2208 done:
2209
2210   return ret;
2211
2212   /* ERRORS */
2213 push_error:
2214   {
2215     GST_DEBUG_OBJECT (rtpsession, "process returned %s",
2216         gst_flow_get_name (ret));
2217     goto done;
2218   }
2219 }
2220
2221 static GstFlowReturn
2222 gst_rtp_session_chain_send_rtp (GstPad * pad, GstObject * parent,
2223     GstBuffer * buffer)
2224 {
2225   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2226
2227   return gst_rtp_session_chain_send_rtp_common (rtpsession, buffer, FALSE);
2228 }
2229
2230 static GstFlowReturn
2231 gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstObject * parent,
2232     GstBufferList * list)
2233 {
2234   GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
2235
2236   return gst_rtp_session_chain_send_rtp_common (rtpsession, list, TRUE);
2237 }
2238
2239 /* Create sinkpad to receive RTP packets from senders. This will also create a
2240  * srcpad for the RTP packets.
2241  */
2242 static GstPad *
2243 create_recv_rtp_sink (GstRtpSession * rtpsession)
2244 {
2245   GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
2246
2247   rtpsession->recv_rtp_sink =
2248       gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
2249       "recv_rtp_sink");
2250   gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
2251       gst_rtp_session_chain_recv_rtp);
2252   gst_pad_set_event_function (rtpsession->recv_rtp_sink,
2253       gst_rtp_session_event_recv_rtp_sink);
2254   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
2255       gst_rtp_session_iterate_internal_links);
2256   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->recv_rtp_sink);
2257   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
2258   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2259       rtpsession->recv_rtp_sink);
2260
2261   GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
2262   rtpsession->recv_rtp_src =
2263       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
2264       "recv_rtp_src");
2265   gst_pad_set_event_function (rtpsession->recv_rtp_src,
2266       gst_rtp_session_event_recv_rtp_src);
2267   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_src,
2268       gst_rtp_session_iterate_internal_links);
2269   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
2270   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
2271   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
2272
2273   return rtpsession->recv_rtp_sink;
2274 }
2275
2276 /* Remove sinkpad to receive RTP packets from senders. This will also remove
2277  * the srcpad for the RTP packets.
2278  */
2279 static void
2280 remove_recv_rtp_sink (GstRtpSession * rtpsession)
2281 {
2282   GST_DEBUG_OBJECT (rtpsession, "removing RTP sink pad");
2283
2284   /* deactivate from source to sink */
2285   gst_pad_set_active (rtpsession->recv_rtp_src, FALSE);
2286   gst_pad_set_active (rtpsession->recv_rtp_sink, FALSE);
2287
2288   /* remove pads */
2289   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2290       rtpsession->recv_rtp_sink);
2291   rtpsession->recv_rtp_sink = NULL;
2292
2293   GST_DEBUG_OBJECT (rtpsession, "removing RTP src pad");
2294   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2295       rtpsession->recv_rtp_src);
2296   rtpsession->recv_rtp_src = NULL;
2297 }
2298
2299 /* Create a sinkpad to receive RTCP messages from senders, this will also create a
2300  * sync_src pad for the SR packets.
2301  */
2302 static GstPad *
2303 create_recv_rtcp_sink (GstRtpSession * rtpsession)
2304 {
2305   GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
2306
2307   rtpsession->recv_rtcp_sink =
2308       gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
2309       "recv_rtcp_sink");
2310   gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
2311       gst_rtp_session_chain_recv_rtcp);
2312   gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
2313       gst_rtp_session_event_recv_rtcp_sink);
2314   gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtcp_sink,
2315       gst_rtp_session_iterate_internal_links);
2316   gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
2317   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2318       rtpsession->recv_rtcp_sink);
2319
2320   GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
2321   rtpsession->sync_src =
2322       gst_pad_new_from_static_template (&rtpsession_sync_src_template,
2323       "sync_src");
2324   gst_pad_set_iterate_internal_links_function (rtpsession->sync_src,
2325       gst_rtp_session_iterate_internal_links);
2326   gst_pad_use_fixed_caps (rtpsession->sync_src);
2327   gst_pad_set_active (rtpsession->sync_src, TRUE);
2328   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2329
2330   return rtpsession->recv_rtcp_sink;
2331 }
2332
2333 static void
2334 remove_recv_rtcp_sink (GstRtpSession * rtpsession)
2335 {
2336   GST_DEBUG_OBJECT (rtpsession, "removing RTCP sink pad");
2337
2338   gst_pad_set_active (rtpsession->sync_src, FALSE);
2339   gst_pad_set_active (rtpsession->recv_rtcp_sink, FALSE);
2340
2341   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2342       rtpsession->recv_rtcp_sink);
2343   rtpsession->recv_rtcp_sink = NULL;
2344
2345   GST_DEBUG_OBJECT (rtpsession, "removing sync src pad");
2346   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
2347   rtpsession->sync_src = NULL;
2348 }
2349
2350 /* Create a sinkpad to receive RTP packets for receivers. This will also create a
2351  * send_rtp_src pad.
2352  */
2353 static GstPad *
2354 create_send_rtp_sink (GstRtpSession * rtpsession)
2355 {
2356   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2357
2358   rtpsession->send_rtp_sink =
2359       gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
2360       "send_rtp_sink");
2361   gst_pad_set_chain_function (rtpsession->send_rtp_sink,
2362       gst_rtp_session_chain_send_rtp);
2363   gst_pad_set_chain_list_function (rtpsession->send_rtp_sink,
2364       gst_rtp_session_chain_send_rtp_list);
2365   gst_pad_set_query_function (rtpsession->send_rtp_sink,
2366       gst_rtp_session_query_send_rtp);
2367   gst_pad_set_event_function (rtpsession->send_rtp_sink,
2368       gst_rtp_session_event_send_rtp_sink);
2369   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_sink,
2370       gst_rtp_session_iterate_internal_links);
2371   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_sink);
2372   GST_PAD_SET_PROXY_ALLOCATION (rtpsession->send_rtp_sink);
2373   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
2374   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2375       rtpsession->send_rtp_sink);
2376
2377   rtpsession->send_rtp_src =
2378       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
2379       "send_rtp_src");
2380   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtp_src,
2381       gst_rtp_session_iterate_internal_links);
2382   gst_pad_set_event_function (rtpsession->send_rtp_src,
2383       gst_rtp_session_event_send_rtp_src);
2384   GST_PAD_SET_PROXY_CAPS (rtpsession->send_rtp_src);
2385   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
2386   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
2387
2388   return rtpsession->send_rtp_sink;
2389 }
2390
2391 static void
2392 remove_send_rtp_sink (GstRtpSession * rtpsession)
2393 {
2394   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2395
2396   gst_pad_set_active (rtpsession->send_rtp_src, FALSE);
2397   gst_pad_set_active (rtpsession->send_rtp_sink, FALSE);
2398
2399   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2400       rtpsession->send_rtp_sink);
2401   rtpsession->send_rtp_sink = NULL;
2402
2403   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2404       rtpsession->send_rtp_src);
2405   rtpsession->send_rtp_src = NULL;
2406 }
2407
2408 /* Create a srcpad with the RTCP packets to send out.
2409  * This pad will be driven by the RTP session manager when it wants to send out
2410  * RTCP packets.
2411  */
2412 static GstPad *
2413 create_send_rtcp_src (GstRtpSession * rtpsession)
2414 {
2415   GST_DEBUG_OBJECT (rtpsession, "creating pad");
2416
2417   rtpsession->send_rtcp_src =
2418       gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
2419       "send_rtcp_src");
2420   gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
2421   gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
2422   gst_pad_set_iterate_internal_links_function (rtpsession->send_rtcp_src,
2423       gst_rtp_session_iterate_internal_links);
2424   gst_pad_set_query_function (rtpsession->send_rtcp_src,
2425       gst_rtp_session_query_send_rtcp_src);
2426   gst_pad_set_event_function (rtpsession->send_rtcp_src,
2427       gst_rtp_session_event_send_rtcp_src);
2428   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
2429       rtpsession->send_rtcp_src);
2430
2431   return rtpsession->send_rtcp_src;
2432 }
2433
2434 static void
2435 remove_send_rtcp_src (GstRtpSession * rtpsession)
2436 {
2437   GST_DEBUG_OBJECT (rtpsession, "removing pad");
2438
2439   gst_pad_set_active (rtpsession->send_rtcp_src, FALSE);
2440
2441   gst_element_remove_pad (GST_ELEMENT_CAST (rtpsession),
2442       rtpsession->send_rtcp_src);
2443   rtpsession->send_rtcp_src = NULL;
2444 }
2445
2446 static GstPad *
2447 gst_rtp_session_request_new_pad (GstElement * element,
2448     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2449 {
2450   GstRtpSession *rtpsession;
2451   GstElementClass *klass;
2452   GstPad *result;
2453
2454   g_return_val_if_fail (templ != NULL, NULL);
2455   g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
2456
2457   rtpsession = GST_RTP_SESSION (element);
2458   klass = GST_ELEMENT_GET_CLASS (element);
2459
2460   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
2461
2462   GST_RTP_SESSION_LOCK (rtpsession);
2463
2464   /* figure out the template */
2465   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
2466     if (rtpsession->recv_rtp_sink != NULL)
2467       goto exists;
2468
2469     result = create_recv_rtp_sink (rtpsession);
2470   } else if (templ == gst_element_class_get_pad_template (klass,
2471           "recv_rtcp_sink")) {
2472     if (rtpsession->recv_rtcp_sink != NULL)
2473       goto exists;
2474
2475     result = create_recv_rtcp_sink (rtpsession);
2476   } else if (templ == gst_element_class_get_pad_template (klass,
2477           "send_rtp_sink")) {
2478     if (rtpsession->send_rtp_sink != NULL)
2479       goto exists;
2480
2481     result = create_send_rtp_sink (rtpsession);
2482   } else if (templ == gst_element_class_get_pad_template (klass,
2483           "send_rtcp_src")) {
2484     if (rtpsession->send_rtcp_src != NULL)
2485       goto exists;
2486
2487     result = create_send_rtcp_src (rtpsession);
2488   } else
2489     goto wrong_template;
2490
2491   GST_RTP_SESSION_UNLOCK (rtpsession);
2492
2493   return result;
2494
2495   /* ERRORS */
2496 wrong_template:
2497   {
2498     GST_RTP_SESSION_UNLOCK (rtpsession);
2499     g_warning ("rtpsession: this is not our template");
2500     return NULL;
2501   }
2502 exists:
2503   {
2504     GST_RTP_SESSION_UNLOCK (rtpsession);
2505     g_warning ("rtpsession: pad already requested");
2506     return NULL;
2507   }
2508 }
2509
2510 static void
2511 gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
2512 {
2513   GstRtpSession *rtpsession;
2514
2515   g_return_if_fail (GST_IS_RTP_SESSION (element));
2516   g_return_if_fail (GST_IS_PAD (pad));
2517
2518   rtpsession = GST_RTP_SESSION (element);
2519
2520   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2521
2522   GST_RTP_SESSION_LOCK (rtpsession);
2523
2524   if (rtpsession->recv_rtp_sink == pad) {
2525     remove_recv_rtp_sink (rtpsession);
2526   } else if (rtpsession->recv_rtcp_sink == pad) {
2527     remove_recv_rtcp_sink (rtpsession);
2528   } else if (rtpsession->send_rtp_sink == pad) {
2529     remove_send_rtp_sink (rtpsession);
2530   } else if (rtpsession->send_rtcp_src == pad) {
2531     remove_send_rtcp_src (rtpsession);
2532   } else
2533     goto wrong_pad;
2534
2535   GST_RTP_SESSION_UNLOCK (rtpsession);
2536
2537   return;
2538
2539   /* ERRORS */
2540 wrong_pad:
2541   {
2542     GST_RTP_SESSION_UNLOCK (rtpsession);
2543     g_warning ("rtpsession: asked to release an unknown pad");
2544     return;
2545   }
2546 }
2547
2548 static void
2549 gst_rtp_session_request_key_unit (RTPSession * sess,
2550     gboolean all_headers, gpointer user_data)
2551 {
2552   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2553   GstEvent *event;
2554   GstPad *send_rtp_sink;
2555
2556   GST_RTP_SESSION_LOCK (rtpsession);
2557   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2558     gst_object_ref (send_rtp_sink);
2559   GST_RTP_SESSION_UNLOCK (rtpsession);
2560
2561   if (send_rtp_sink) {
2562     event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2563         gst_structure_new ("GstForceKeyUnit",
2564             "all-headers", G_TYPE_BOOLEAN, all_headers, NULL));
2565     gst_pad_push_event (send_rtp_sink, event);
2566     gst_object_unref (send_rtp_sink);
2567   }
2568 }
2569
2570 static GstClockTime
2571 gst_rtp_session_request_time (RTPSession * session, gpointer user_data)
2572 {
2573   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2574
2575   return gst_clock_get_time (rtpsession->priv->sysclock);
2576 }
2577
2578 static void
2579 gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
2580     guint16 blp, guint32 ssrc, gpointer user_data)
2581 {
2582   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2583   GstEvent *event;
2584   GstPad *send_rtp_sink;
2585
2586   GST_RTP_SESSION_LOCK (rtpsession);
2587   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2588     gst_object_ref (send_rtp_sink);
2589   GST_RTP_SESSION_UNLOCK (rtpsession);
2590
2591   if (send_rtp_sink) {
2592     while (TRUE) {
2593       event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2594           gst_structure_new ("GstRTPRetransmissionRequest",
2595               "seqnum", G_TYPE_UINT, (guint) seqnum,
2596               "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
2597       gst_pad_push_event (send_rtp_sink, event);
2598
2599       if (blp == 0)
2600         break;
2601
2602       seqnum++;
2603       while ((blp & 1) == 0) {
2604         seqnum++;
2605         blp >>= 1;
2606       }
2607       blp >>= 1;
2608     }
2609     gst_object_unref (send_rtp_sink);
2610   }
2611 }
2612
2613 static void
2614 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
2615 {
2616   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
2617   GstPad *send_rtp_sink;
2618
2619   GST_RTP_SESSION_LOCK (rtpsession);
2620   if ((send_rtp_sink = rtpsession->send_rtp_sink))
2621     gst_object_ref (send_rtp_sink);
2622   GST_RTP_SESSION_UNLOCK (rtpsession);
2623
2624   if (send_rtp_sink) {
2625     gst_pad_push_event (send_rtp_sink, gst_event_new_reconfigure ());
2626     gst_object_unref (send_rtp_sink);
2627   }
2628 }